pacemaker  1.1.14-70404b0
Scalable High-Availability cluster resource manager
mainloop.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <crm_internal.h>
20 
21 #ifndef _GNU_SOURCE
22 # define _GNU_SOURCE
23 #endif
24 
25 #include <stdlib.h>
26 #include <signal.h>
27 #include <errno.h>
28 
29 #include <sys/wait.h>
30 
31 #include <crm/crm.h>
32 #include <crm/common/xml.h>
33 #include <crm/common/mainloop.h>
34 #include <crm/common/ipcs.h>
35 
36 struct mainloop_child_s {
37  pid_t pid;
38  char *desc;
39  unsigned timerid;
40  unsigned watchid;
41  gboolean timeout;
42  void *privatedata;
43 
45 
46  /* Called when a process dies */
47  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
48 };
49 
50 struct trigger_s {
51  GSource source;
52  gboolean running;
53  gboolean trigger;
54  void *user_data;
55  guint id;
56 
57 };
58 
59 static gboolean
60 crm_trigger_prepare(GSource * source, gint * timeout)
61 {
62  crm_trigger_t *trig = (crm_trigger_t *) source;
63 
64  /* cluster-glue's FD and IPC related sources make use of
65  * g_source_add_poll() but do not set a timeout in their prepare
66  * functions
67  *
68  * This means mainloop's poll() will block until an event for one
69  * of these sources occurs - any /other/ type of source, such as
70  * this one or g_idle_*, that doesn't use g_source_add_poll() is
71  * S-O-L and won't be processed until there is something fd-based
72  * happens.
73  *
74  * Luckily the timeout we can set here affects all sources and
75  * puts an upper limit on how long poll() can take.
76  *
77  * So unconditionally set a small-ish timeout, not too small that
78  * we're in constant motion, which will act as an upper bound on
79  * how long the signal handling might be delayed for.
80  */
81  *timeout = 500; /* Timeout in ms */
82 
83  return trig->trigger;
84 }
85 
86 static gboolean
87 crm_trigger_check(GSource * source)
88 {
89  crm_trigger_t *trig = (crm_trigger_t *) source;
90 
91  return trig->trigger;
92 }
93 
94 static gboolean
95 crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
96 {
97  int rc = TRUE;
98  crm_trigger_t *trig = (crm_trigger_t *) source;
99 
100  if (trig->running) {
101  /* Wait until the existing job is complete before starting the next one */
102  return TRUE;
103  }
104  trig->trigger = FALSE;
105 
106  if (callback) {
107  rc = callback(trig->user_data);
108  if (rc < 0) {
109  crm_trace("Trigger handler %p not yet complete", trig);
110  trig->running = TRUE;
111  rc = TRUE;
112  }
113  }
114  return rc;
115 }
116 
117 static void
118 crm_trigger_finalize(GSource * source)
119 {
120  crm_trace("Trigger %p destroyed", source);
121 }
122 
123 #if 0
124 struct _GSourceCopy
125 {
126  gpointer callback_data;
127  GSourceCallbackFuncs *callback_funcs;
128 
129  const GSourceFuncs *source_funcs;
130  guint ref_count;
131 
132  GMainContext *context;
133 
134  gint priority;
135  guint flags;
136  guint source_id;
137 
138  GSList *poll_fds;
139 
140  GSource *prev;
141  GSource *next;
142 
143  char *name;
144 
145  void *priv;
146 };
147 
148 static int
149 g_source_refcount(GSource * source)
150 {
151  /* Duplicating the contents of private header files is a necessary evil */
152  if (source) {
153  struct _GSourceCopy *evil = (struct _GSourceCopy*)source;
154  return evil->ref_count;
155  }
156  return 0;
157 }
158 #else
159 static int g_source_refcount(GSource * source)
160 {
161  return 0;
162 }
163 #endif
164 
165 static GSourceFuncs crm_trigger_funcs = {
166  crm_trigger_prepare,
167  crm_trigger_check,
168  crm_trigger_dispatch,
169  crm_trigger_finalize,
170 };
171 
172 static crm_trigger_t *
173 mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
174  gpointer userdata)
175 {
176  crm_trigger_t *trigger = NULL;
177 
178  trigger = (crm_trigger_t *) source;
179 
180  trigger->id = 0;
181  trigger->trigger = FALSE;
182  trigger->user_data = userdata;
183 
184  if (dispatch) {
185  g_source_set_callback(source, dispatch, trigger, NULL);
186  }
187 
188  g_source_set_priority(source, priority);
189  g_source_set_can_recurse(source, FALSE);
190 
191  crm_trace("Setup %p with ref-count=%u", source, g_source_refcount(source));
192  trigger->id = g_source_attach(source, NULL);
193  crm_trace("Attached %p with ref-count=%u", source, g_source_refcount(source));
194 
195  return trigger;
196 }
197 
198 void
200 {
201  crm_trace("Trigger handler %p complete", trig);
202  trig->running = FALSE;
203 }
204 
205 /* If dispatch returns:
206  * -1: Job running but not complete
207  * 0: Remove the trigger from mainloop
208  * 1: Leave the trigger in mainloop
209  */
211 mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata)
212 {
213  GSource *source = NULL;
214 
215  CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource));
216  source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
217  CRM_ASSERT(source != NULL);
218 
219  return mainloop_setup_trigger(source, priority, dispatch, userdata);
220 }
221 
222 void
224 {
225  if(source) {
226  source->trigger = TRUE;
227  }
228 }
229 
230 gboolean
232 {
233  GSource *gs = NULL;
234 
235  if(source == NULL) {
236  return TRUE;
237  }
238 
239  gs = (GSource *)source;
240 
241  if(g_source_refcount(gs) > 2) {
242  crm_info("Trigger %p is still referenced %u times", gs, g_source_refcount(gs));
243  }
244 
245  g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
246  g_source_unref(gs); /* The caller no longer carries a reference to source
247  *
248  * At this point the source should be free'd,
249  * unless we're currently processing said
250  * source, in which case mainloop holds an
251  * additional reference and it will be free'd
252  * once our processing completes
253  */
254  return TRUE;
255 }
256 
257 typedef struct signal_s {
258  crm_trigger_t trigger; /* must be first */
259  void (*handler) (int sig);
260  int signal;
261 
262 } crm_signal_t;
263 
264 static crm_signal_t *crm_signals[NSIG];
265 
266 static gboolean
267 crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
268 {
269  crm_signal_t *sig = (crm_signal_t *) source;
270 
271  if(sig->signal != SIGCHLD) {
272  crm_notice("Invoking handler for signal %d: %s", sig->signal, strsignal(sig->signal));
273  }
274 
275  sig->trigger.trigger = FALSE;
276  if (sig->handler) {
277  sig->handler(sig->signal);
278  }
279  return TRUE;
280 }
281 
282 static void
283 mainloop_signal_handler(int sig)
284 {
285  if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
286  mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
287  }
288 }
289 
290 static GSourceFuncs crm_signal_funcs = {
291  crm_trigger_prepare,
292  crm_trigger_check,
293  crm_signal_dispatch,
294  crm_trigger_finalize,
295 };
296 
297 gboolean
298 crm_signal(int sig, void (*dispatch) (int sig))
299 {
300  sigset_t mask;
301  struct sigaction sa;
302  struct sigaction old;
303 
304  if (sigemptyset(&mask) < 0) {
305  crm_perror(LOG_ERR, "Call to sigemptyset failed");
306  return FALSE;
307  }
308 
309  memset(&sa, 0, sizeof(struct sigaction));
310  sa.sa_handler = dispatch;
311  sa.sa_flags = SA_RESTART;
312  sa.sa_mask = mask;
313 
314  if (sigaction(sig, &sa, &old) < 0) {
315  crm_perror(LOG_ERR, "Could not install signal handler for signal %d", sig);
316  return FALSE;
317  }
318 
319  return TRUE;
320 }
321 
322 gboolean
323 mainloop_add_signal(int sig, void (*dispatch) (int sig))
324 {
325  GSource *source = NULL;
326  int priority = G_PRIORITY_HIGH - 1;
327 
328  if (sig == SIGTERM) {
329  /* TERM is higher priority than other signals,
330  * signals are higher priority than other ipc.
331  * Yes, minus: smaller is "higher"
332  */
333  priority--;
334  }
335 
336  if (sig >= NSIG || sig < 0) {
337  crm_err("Signal %d is out of range", sig);
338  return FALSE;
339 
340  } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
341  crm_trace("Signal handler for %d is already installed", sig);
342  return TRUE;
343 
344  } else if (crm_signals[sig] != NULL) {
345  crm_err("Different signal handler for %d is already installed", sig);
346  return FALSE;
347  }
348 
349  CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource));
350  source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
351 
352  crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
353  CRM_ASSERT(crm_signals[sig] != NULL);
354 
355  crm_signals[sig]->handler = dispatch;
356  crm_signals[sig]->signal = sig;
357 
358  if (crm_signal(sig, mainloop_signal_handler) == FALSE) {
359  crm_signal_t *tmp = crm_signals[sig];
360 
361  crm_signals[sig] = NULL;
362 
364  return FALSE;
365  }
366 #if 0
367  /* If we want signals to interrupt mainloop's poll(), instead of waiting for
368  * the timeout, then we should call siginterrupt() below
369  *
370  * For now, just enforce a low timeout
371  */
372  if (siginterrupt(sig, 1) < 0) {
373  crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig);
374  }
375 #endif
376 
377  return TRUE;
378 }
379 
380 gboolean
382 {
383  crm_signal_t *tmp = NULL;
384 
385  if (sig >= NSIG || sig < 0) {
386  crm_err("Signal %d is out of range", sig);
387  return FALSE;
388 
389  } else if (crm_signal(sig, NULL) == FALSE) {
390  crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
391  return FALSE;
392 
393  } else if (crm_signals[sig] == NULL) {
394  return TRUE;
395  }
396 
397  crm_trace("Destroying signal %d", sig);
398  tmp = crm_signals[sig];
399  crm_signals[sig] = NULL;
401  return TRUE;
402 }
403 
404 static qb_array_t *gio_map = NULL;
405 
406 void
408 {
409  if(gio_map) {
410  qb_array_free(gio_map);
411  }
412 }
413 
414 /*
415  * libqb...
416  */
417 struct gio_to_qb_poll {
418  int32_t is_used;
419  guint source;
420  int32_t events;
421  void *data;
422  qb_ipcs_dispatch_fn_t fn;
423  enum qb_loop_priority p;
424 };
425 
426 static gboolean
427 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
428 {
429  struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
430  gint fd = g_io_channel_unix_get_fd(gio);
431 
432  crm_trace("%p.%d %d", data, fd, condition);
433 
434  /* if this assert get's hit, then there is a race condition between
435  * when we destroy a fd and when mainloop actually gives it up */
436  CRM_ASSERT(adaptor->is_used > 0);
437 
438  return (adaptor->fn(fd, condition, adaptor->data) == 0);
439 }
440 
441 static void
442 gio_poll_destroy(gpointer data)
443 {
444  struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
445 
446  adaptor->is_used--;
447  CRM_ASSERT(adaptor->is_used >= 0);
448 
449  if (adaptor->is_used == 0) {
450  crm_trace("Marking adaptor %p unused", adaptor);
451  adaptor->source = 0;
452  }
453 }
454 
455 static int32_t
456 gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
457  void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
458 {
459  struct gio_to_qb_poll *adaptor;
460  GIOChannel *channel;
461  int32_t res = 0;
462 
463  res = qb_array_index(gio_map, fd, (void **)&adaptor);
464  if (res < 0) {
465  crm_err("Array lookup failed for fd=%d: %d", fd, res);
466  return res;
467  }
468 
469  crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
470 
471  if (add && adaptor->source) {
472  crm_err("Adaptor for descriptor %d is still in-use", fd);
473  return -EEXIST;
474  }
475  if (!add && !adaptor->is_used) {
476  crm_err("Adaptor for descriptor %d is not in-use", fd);
477  return -ENOENT;
478  }
479 
480  /* channel is created with ref_count = 1 */
481  channel = g_io_channel_unix_new(fd);
482  if (!channel) {
483  crm_err("No memory left to add fd=%d", fd);
484  return -ENOMEM;
485  }
486 
487  if (adaptor->source) {
488  g_source_remove(adaptor->source);
489  adaptor->source = 0;
490  }
491 
492  /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
493  evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
494 
495  adaptor->fn = fn;
496  adaptor->events = evts;
497  adaptor->data = data;
498  adaptor->p = p;
499  adaptor->is_used++;
500  adaptor->source =
501  g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor,
502  gio_poll_destroy);
503 
504  /* Now that mainloop now holds a reference to channel,
505  * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
506  *
507  * This means that channel will be free'd by:
508  * g_main_context_dispatch()
509  * -> g_source_destroy_internal()
510  * -> g_source_callback_unref()
511  * shortly after gio_poll_destroy() completes
512  */
513  g_io_channel_unref(channel);
514 
515  crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
516  if (adaptor->source > 0) {
517  return 0;
518  }
519 
520  return -EINVAL;
521 }
522 
523 static int32_t
524 gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
525  void *data, qb_ipcs_dispatch_fn_t fn)
526 {
527  return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
528 }
529 
530 static int32_t
531 gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
532  void *data, qb_ipcs_dispatch_fn_t fn)
533 {
534  return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
535 }
536 
537 static int32_t
538 gio_poll_dispatch_del(int32_t fd)
539 {
540  struct gio_to_qb_poll *adaptor;
541 
542  crm_trace("Looking for fd=%d", fd);
543  if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
544  if (adaptor->source) {
545  g_source_remove(adaptor->source);
546  adaptor->source = 0;
547  }
548  }
549  return 0;
550 }
551 
552 struct qb_ipcs_poll_handlers gio_poll_funcs = {
553  .job_add = NULL,
554  .dispatch_add = gio_poll_dispatch_add,
555  .dispatch_mod = gio_poll_dispatch_mod,
556  .dispatch_del = gio_poll_dispatch_del,
557 };
558 
559 static enum qb_ipc_type
560 pick_ipc_type(enum qb_ipc_type requested)
561 {
562  const char *env = getenv("PCMK_ipc_type");
563 
564  if (env && strcmp("shared-mem", env) == 0) {
565  return QB_IPC_SHM;
566  } else if (env && strcmp("socket", env) == 0) {
567  return QB_IPC_SOCKET;
568  } else if (env && strcmp("posix", env) == 0) {
569  return QB_IPC_POSIX_MQ;
570  } else if (env && strcmp("sysv", env) == 0) {
571  return QB_IPC_SYSV_MQ;
572  } else if (requested == QB_IPC_NATIVE) {
573  /* We prefer shared memory because the server never blocks on
574  * send. If part of a message fits into the socket, libqb
575  * needs to block until the remainder can be sent also.
576  * Otherwise the client will wait forever for the remaining
577  * bytes.
578  */
579  return QB_IPC_SHM;
580  }
581  return requested;
582 }
583 
584 qb_ipcs_service_t *
585 mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
586  struct qb_ipcs_service_handlers * callbacks)
587 {
588  int rc = 0;
589  qb_ipcs_service_t *server = NULL;
590 
591  if (gio_map == NULL) {
592  gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
593  }
594 
595  crm_client_init();
596  server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
597 
598 #ifdef HAVE_IPCS_GET_BUFFER_SIZE
599  /* All clients should use at least ipc_buffer_max as their buffer size */
600  qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
601 #endif
602 
603  qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
604 
605  rc = qb_ipcs_run(server);
606  if (rc < 0) {
607  crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
608  return NULL;
609  }
610 
611  return server;
612 }
613 
614 void
615 mainloop_del_ipc_server(qb_ipcs_service_t * server)
616 {
617  if (server) {
618  qb_ipcs_destroy(server);
619  }
620 }
621 
622 struct mainloop_io_s {
623  char *name;
624  void *userdata;
625 
626  int fd;
627  guint source;
628  crm_ipc_t *ipc;
629  GIOChannel *channel;
630 
631  int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
632  int (*dispatch_fn_io) (gpointer userdata);
633  void (*destroy_fn) (gpointer userdata);
634 
635 };
636 
637 static gboolean
638 mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data)
639 {
640  gboolean keep = TRUE;
641  mainloop_io_t *client = data;
642 
643  CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
644 
645  if (condition & G_IO_IN) {
646  if (client->ipc) {
647  long rc = 0;
648  int max = 10;
649 
650  do {
651  rc = crm_ipc_read(client->ipc);
652  if (rc <= 0) {
653  crm_trace("Message acquisition from %s[%p] failed: %s (%ld)",
654  client->name, client, pcmk_strerror(rc), rc);
655 
656  } else if (client->dispatch_fn_ipc) {
657  const char *buffer = crm_ipc_buffer(client->ipc);
658 
659  crm_trace("New message from %s[%p] = %d", client->name, client, rc, condition);
660  if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
661  crm_trace("Connection to %s no longer required", client->name);
662  keep = FALSE;
663  }
664  }
665 
666  } while (keep && rc > 0 && --max > 0);
667 
668  } else {
669  crm_trace("New message from %s[%p] %u", client->name, client, condition);
670  if (client->dispatch_fn_io) {
671  if (client->dispatch_fn_io(client->userdata) < 0) {
672  crm_trace("Connection to %s no longer required", client->name);
673  keep = FALSE;
674  }
675  }
676  }
677  }
678 
679  if (client->ipc && crm_ipc_connected(client->ipc) == FALSE) {
680  crm_err("Connection to %s[%p] closed (I/O condition=%d)", client->name, client, condition);
681  keep = FALSE;
682 
683  } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
684  crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
685  client->name, client, condition);
686  keep = FALSE;
687 
688  } else if ((condition & G_IO_IN) == 0) {
689  /*
690  #define GLIB_SYSDEF_POLLIN =1
691  #define GLIB_SYSDEF_POLLPRI =2
692  #define GLIB_SYSDEF_POLLOUT =4
693  #define GLIB_SYSDEF_POLLERR =8
694  #define GLIB_SYSDEF_POLLHUP =16
695  #define GLIB_SYSDEF_POLLNVAL =32
696 
697  typedef enum
698  {
699  G_IO_IN GLIB_SYSDEF_POLLIN,
700  G_IO_OUT GLIB_SYSDEF_POLLOUT,
701  G_IO_PRI GLIB_SYSDEF_POLLPRI,
702  G_IO_ERR GLIB_SYSDEF_POLLERR,
703  G_IO_HUP GLIB_SYSDEF_POLLHUP,
704  G_IO_NVAL GLIB_SYSDEF_POLLNVAL
705  } GIOCondition;
706 
707  A bitwise combination representing a condition to watch for on an event source.
708 
709  G_IO_IN There is data to read.
710  G_IO_OUT Data can be written (without blocking).
711  G_IO_PRI There is urgent data to read.
712  G_IO_ERR Error condition.
713  G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
714  G_IO_NVAL Invalid request. The file descriptor is not open.
715  */
716  crm_err("Strange condition: %d", condition);
717  }
718 
719  /* keep == FALSE results in mainloop_gio_destroy() being called
720  * just before the source is removed from mainloop
721  */
722  return keep;
723 }
724 
725 static void
726 mainloop_gio_destroy(gpointer c)
727 {
728  mainloop_io_t *client = c;
729  char *c_name = strdup(client->name);
730 
731  /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
732  * client->channel will still have ref_count > 0... should be == 1
733  */
734  crm_trace("Destroying client %s[%p]", c_name, c);
735 
736  if (client->ipc) {
737  crm_ipc_close(client->ipc);
738  }
739 
740  if (client->destroy_fn) {
741  void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
742 
743  client->destroy_fn = NULL;
744  destroy_fn(client->userdata);
745  }
746 
747  if (client->ipc) {
748  crm_ipc_t *ipc = client->ipc;
749 
750  client->ipc = NULL;
751  crm_ipc_destroy(ipc);
752  }
753 
754  crm_trace("Destroyed client %s[%p]", c_name, c);
755 
756  free(client->name); client->name = NULL;
757  free(client);
758 
759  free(c_name);
760 }
761 
763 mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata,
764  struct ipc_client_callbacks *callbacks)
765 {
766  mainloop_io_t *client = NULL;
767  crm_ipc_t *conn = crm_ipc_new(name, max_size);
768 
769  if (conn && crm_ipc_connect(conn)) {
770  int32_t fd = crm_ipc_get_fd(conn);
771 
772  client = mainloop_add_fd(name, priority, fd, userdata, NULL);
773  }
774 
775  if (client == NULL) {
776  crm_perror(LOG_TRACE, "Connection to %s failed", name);
777  if (conn) {
778  crm_ipc_close(conn);
779  crm_ipc_destroy(conn);
780  }
781  return NULL;
782  }
783 
784  client->ipc = conn;
785  client->destroy_fn = callbacks->destroy;
786  client->dispatch_fn_ipc = callbacks->dispatch;
787  return client;
788 }
789 
790 void
792 {
793  mainloop_del_fd(client);
794 }
795 
796 crm_ipc_t *
798 {
799  if (client) {
800  return client->ipc;
801  }
802  return NULL;
803 }
804 
806 mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
807  struct mainloop_fd_callbacks * callbacks)
808 {
809  mainloop_io_t *client = NULL;
810 
811  if (fd >= 0) {
812  client = calloc(1, sizeof(mainloop_io_t));
813  if (client == NULL) {
814  return NULL;
815  }
816  client->name = strdup(name);
817  client->userdata = userdata;
818 
819  if (callbacks) {
820  client->destroy_fn = callbacks->destroy;
821  client->dispatch_fn_io = callbacks->dispatch;
822  }
823 
824  client->fd = fd;
825  client->channel = g_io_channel_unix_new(fd);
826  client->source =
827  g_io_add_watch_full(client->channel, priority,
828  (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
829  client, mainloop_gio_destroy);
830 
831  /* Now that mainloop now holds a reference to channel,
832  * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
833  *
834  * This means that channel will be free'd by:
835  * g_main_context_dispatch() or g_source_remove()
836  * -> g_source_destroy_internal()
837  * -> g_source_callback_unref()
838  * shortly after mainloop_gio_destroy() completes
839  */
840  g_io_channel_unref(client->channel);
841  crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
842  } else {
843  errno = EINVAL;
844  }
845 
846  return client;
847 }
848 
849 void
851 {
852  if (client != NULL) {
853  crm_trace("Removing client %s[%p]", client->name, client);
854  if (client->source) {
855  /* Results in mainloop_gio_destroy() being called just
856  * before the source is removed from mainloop
857  */
858  g_source_remove(client->source);
859  }
860  }
861 }
862 
863 static GListPtr child_list = NULL;
864 
865 pid_t
867 {
868  return child->pid;
869 }
870 
871 const char *
873 {
874  return child->desc;
875 }
876 
877 int
879 {
880  return child->timeout;
881 }
882 
883 void *
885 {
886  return child->privatedata;
887 }
888 
889 void
891 {
892  child->privatedata = NULL;
893 }
894 
895 /* good function name */
896 static void
897 child_free(mainloop_child_t *child)
898 {
899  if (child->timerid != 0) {
900  crm_trace("Removing timer %d", child->timerid);
901  g_source_remove(child->timerid);
902  child->timerid = 0;
903  }
904  free(child->desc);
905  free(child);
906 }
907 
908 /* terrible function name */
909 static int
910 child_kill_helper(mainloop_child_t *child)
911 {
912  int rc;
913  if (child->flags & mainloop_leave_pid_group) {
914  crm_debug("Kill pid %d only. leave group intact.", child->pid);
915  rc = kill(child->pid, SIGKILL);
916  } else {
917  crm_debug("Kill pid %d's group", child->pid);
918  rc = kill(-child->pid, SIGKILL);
919  }
920 
921  if (rc < 0) {
922  if (errno != ESRCH) {
923  crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
924  }
925  return -errno;
926  }
927  return 0;
928 }
929 
930 static gboolean
931 child_timeout_callback(gpointer p)
932 {
933  mainloop_child_t *child = p;
934  int rc = 0;
935 
936  child->timerid = 0;
937  if (child->timeout) {
938  crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid);
939  return FALSE;
940  }
941 
942  rc = child_kill_helper(child);
943  if (rc == ESRCH) {
944  /* Nothing left to do. pid doesn't exist */
945  return FALSE;
946  }
947 
948  child->timeout = TRUE;
949  crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid);
950 
951  child->timerid = g_timeout_add(5000, child_timeout_callback, child);
952  return FALSE;
953 }
954 
955 static gboolean
956 child_waitpid(mainloop_child_t *child, int flags)
957 {
958  int rc = 0;
959  int core = 0;
960  int signo = 0;
961  int status = 0;
962  int exitcode = 0;
963 
964  rc = waitpid(child->pid, &status, flags);
965  if(rc == 0) {
966  crm_perror(LOG_DEBUG, "wait(%d) = %d", child->pid, rc);
967  return FALSE;
968 
969  } else if(rc != child->pid) {
970  signo = SIGCHLD;
971  exitcode = 1;
972  status = 1;
973  crm_perror(LOG_ERR, "Call to waitpid(%d) failed", child->pid);
974 
975  } else {
976  crm_trace("Managed process %d exited: %p", child->pid, child);
977 
978  if (WIFEXITED(status)) {
979  exitcode = WEXITSTATUS(status);
980  crm_trace("Managed process %d (%s) exited with rc=%d", child->pid, child->desc, exitcode);
981 
982  } else if (WIFSIGNALED(status)) {
983  signo = WTERMSIG(status);
984  crm_trace("Managed process %d (%s) exited with signal=%d", child->pid, child->desc, signo);
985  }
986 #ifdef WCOREDUMP
987  if (WCOREDUMP(status)) {
988  core = 1;
989  crm_err("Managed process %d (%s) dumped core", child->pid, child->desc);
990  }
991 #endif
992  }
993 
994  if (child->callback) {
995  child->callback(child, child->pid, core, signo, exitcode);
996  }
997  return TRUE;
998 }
999 
1000 static void
1001 child_death_dispatch(int signal)
1002 {
1003  GListPtr iter = child_list;
1004  gboolean exited;
1005 
1006  while(iter) {
1007  GListPtr saved = NULL;
1008  mainloop_child_t *child = iter->data;
1009  exited = child_waitpid(child, WNOHANG);
1010 
1011  saved = iter;
1012  iter = iter->next;
1013 
1014  if (exited == FALSE) {
1015  continue;
1016  }
1017  crm_trace("Removing process entry %p for %d", child, child->pid);
1018 
1019  child_list = g_list_remove_link(child_list, saved);
1020  g_list_free(saved);
1021  child_free(child);
1022  }
1023 }
1024 
1025 static gboolean
1026 child_signal_init(gpointer p)
1027 {
1028  crm_trace("Installed SIGCHLD handler");
1029  /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
1030  mainloop_add_signal(SIGCHLD, child_death_dispatch);
1031 
1032  /* In case they terminated before the signal handler was installed */
1033  child_death_dispatch(SIGCHLD);
1034  return FALSE;
1035 }
1036 
1037 int
1039 {
1040  GListPtr iter;
1041  mainloop_child_t *child = NULL;
1042  mainloop_child_t *match = NULL;
1043  /* It is impossible to block SIGKILL, this allows us to
1044  * call waitpid without WNOHANG flag.*/
1045  int waitflags = 0, rc = 0;
1046 
1047  for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1048  child = iter->data;
1049  if (pid == child->pid) {
1050  match = child;
1051  }
1052  }
1053 
1054  if (match == NULL) {
1055  return FALSE;
1056  }
1057 
1058  rc = child_kill_helper(match);
1059  if(rc == -ESRCH) {
1060  /* Its gone, but hasn't shown up in waitpid() yet
1061  *
1062  * Wait until we get SIGCHLD and let child_death_dispatch()
1063  * clean it up as normal (so we get the correct return
1064  * code/status)
1065  *
1066  * The blocking alternative would be to call:
1067  * child_waitpid(match, 0);
1068  */
1069  crm_trace("Waiting for child %d to be reaped by child_death_dispatch()", match->pid);
1070  return TRUE;
1071 
1072  } else if(rc != 0) {
1073  /* If KILL for some other reason set the WNOHANG flag since we
1074  * can't be certain what happened.
1075  */
1076  waitflags = WNOHANG;
1077  }
1078 
1079  if (child_waitpid(match, waitflags) == FALSE) {
1080  /* not much we can do if this occurs */
1081  return FALSE;
1082  }
1083 
1084  child_list = g_list_remove(child_list, match);
1085  child_free(match);
1086  return TRUE;
1087 }
1088 
1089 /* Create/Log a new tracked process
1090  * To track a process group, use -pid
1091  */
1092 void
1093 mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
1094  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1095 {
1096  static bool need_init = TRUE;
1097  mainloop_child_t *child = g_new(mainloop_child_t, 1);
1098 
1099  child->pid = pid;
1100  child->timerid = 0;
1101  child->timeout = FALSE;
1102  child->privatedata = privatedata;
1103  child->callback = callback;
1104  child->flags = flags;
1105 
1106  if(desc) {
1107  child->desc = strdup(desc);
1108  }
1109 
1110  if (timeout) {
1111  child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
1112  }
1113 
1114  child_list = g_list_append(child_list, child);
1115 
1116  if(need_init) {
1117  need_init = FALSE;
1118  /* SIGCHLD processing has to be invoked from mainloop.
1119  * We do not want it to be possible to both add a child pid
1120  * to mainloop, and have the pid's exit callback invoked within
1121  * the same callstack. */
1122  g_timeout_add(1, child_signal_init, NULL);
1123  }
1124 }
1125 
1126 void
1127 mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
1128  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1129 {
1130  mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
1131 }
1132 
1133 struct mainloop_timer_s {
1134  guint id;
1135  guint period_ms;
1136  bool repeat;
1137  char *name;
1138  GSourceFunc cb;
1139  void *userdata;
1140 };
1141 
1142 struct mainloop_timer_s mainloop;
1143 
1144 static gboolean mainloop_timer_cb(gpointer user_data)
1145 {
1146  int id = 0;
1147  bool repeat = FALSE;
1148  struct mainloop_timer_s *t = user_data;
1149 
1150  CRM_ASSERT(t != NULL);
1151 
1152  id = t->id;
1153  t->id = 0; /* Ensure its unset during callbacks so that
1154  * mainloop_timer_running() works as expected
1155  */
1156 
1157  if(t->cb) {
1158  crm_trace("Invoking callbacks for timer %s", t->name);
1159  repeat = t->repeat;
1160  if(t->cb(t->userdata) == FALSE) {
1161  crm_trace("Timer %s complete", t->name);
1162  repeat = FALSE;
1163  }
1164  }
1165 
1166  if(repeat) {
1167  /* Restore if repeating */
1168  t->id = id;
1169  }
1170 
1171  return repeat;
1172 }
1173 
1175 {
1176  if(t && t->id != 0) {
1177  return TRUE;
1178  }
1179  return FALSE;
1180 }
1181 
1183 {
1185  if(t && t->period_ms > 0) {
1186  crm_trace("Starting timer %s", t->name);
1187  t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
1188  }
1189 }
1190 
1192 {
1193  if(t && t->id != 0) {
1194  crm_trace("Stopping timer %s", t->name);
1195  g_source_remove(t->id);
1196  t->id = 0;
1197  }
1198 }
1199 
1201 {
1202  guint last = 0;
1203 
1204  if(t) {
1205  last = t->period_ms;
1206  t->period_ms = period_ms;
1207  }
1208 
1209  if(t && t->id != 0 && last != t->period_ms) {
1211  }
1212  return last;
1213 }
1214 
1215 
1217 mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
1218 {
1219  mainloop_timer_t *t = calloc(1, sizeof(mainloop_timer_t));
1220 
1221  if(t) {
1222  if(name) {
1223  t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
1224  } else {
1225  t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
1226  }
1227  t->id = 0;
1228  t->period_ms = period_ms;
1229  t->repeat = repeat;
1230  t->cb = cb;
1231  t->userdata = userdata;
1232  crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
1233  }
1234  return t;
1235 }
1236 
1237 void
1239 {
1240  if(t) {
1241  crm_trace("Destroying timer %s", t->name);
1243  free(t->name);
1244  free(t);
1245  }
1246 }
1247 
void * mainloop_child_userdata(mainloop_child_t *child)
Definition: mainloop.c:884
#define LOG_TRACE
Definition: logging.h:29
pid_t mainloop_child_pid(mainloop_child_t *child)
Definition: mainloop.c:866
bool mainloop_timer_running(mainloop_timer_t *t)
Definition: mainloop.c:1174
bool crm_ipc_connect(crm_ipc_t *client)
Establish an IPC connection to a Pacemaker component.
Definition: ipc.c:801
A dumping ground.
void(* destroy)(gpointer)
Definition: mainloop.h:74
#define crm_notice(fmt, args...)
Definition: logging.h:250
struct signal_s crm_signal_t
#define crm_crit(fmt, args...)
Definition: logging.h:247
void mainloop_del_fd(mainloop_io_t *client)
Definition: mainloop.c:850
void mainloop_del_ipc_server(qb_ipcs_service_t *server)
Definition: mainloop.c:615
mainloop_child_flags
Definition: mainloop.h:29
gboolean mainloop_destroy_signal(int sig)
Definition: mainloop.c:381
crm_trigger_t * mainloop_add_trigger(int priority, int(*dispatch)(gpointer user_data), gpointer userdata)
Definition: mainloop.c:211
void(* destroy)(gpointer userdata)
Definition: mainloop.h:91
int crm_ipc_get_fd(crm_ipc_t *client)
Definition: ipc.c:907
const char * pcmk_strerror(int rc)
Definition: logging.c:1113
void mainloop_trigger_complete(crm_trigger_t *trig)
Definition: mainloop.c:199
const char * mainloop_child_name(mainloop_child_t *child)
Definition: mainloop.c:872
struct mainloop_timer_s mainloop_timer_t
Definition: mainloop.h:37
struct mainloop_io_s mainloop_io_t
Definition: mainloop.h:35
struct mainloop_child_s mainloop_child_t
Definition: mainloop.h:36
gboolean crm_signal(int sig, void(*dispatch)(int sig))
Definition: mainloop.c:298
long crm_ipc_read(crm_ipc_t *client)
Definition: ipc.c:1005
mainloop_timer_t * mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
Definition: mainloop.c:1217
uint32_t pid
Definition: internal.h:49
mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
Definition: mainloop.c:763
struct qb_ipcs_poll_handlers gio_poll_funcs
Definition: mainloop.c:552
guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
Definition: mainloop.c:1200
Wrappers for and extensions to glib mainloop.
void crm_client_init(void)
Definition: ipc.c:253
void mainloop_timer_del(mainloop_timer_t *t)
Definition: mainloop.c:1238
crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client)
Definition: mainloop.c:797
const char * crm_ipc_buffer(crm_ipc_t *client)
Definition: ipc.c:1051
void mainloop_del_ipc_client(mainloop_io_t *client)
Definition: mainloop.c:791
struct trigger_s crm_trigger_t
Definition: mainloop.h:34
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:90
#define crm_warn(fmt, args...)
Definition: logging.h:249
int mainloop_child_timeout(mainloop_child_t *child)
Definition: mainloop.c:878
uint32_t id
Definition: internal.h:48
uint64_t flags
Definition: remote.c:121
#define crm_debug(fmt, args...)
Definition: logging.h:253
struct crm_ipc_s crm_ipc_t
Definition: ipc.h:61
void mainloop_set_trigger(crm_trigger_t *source)
Definition: mainloop.c:223
#define crm_trace(fmt, args...)
Definition: logging.h:254
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:806
void mainloop_timer_start(mainloop_timer_t *t)
Definition: mainloop.c:1182
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
Definition: mainloop.c:585
Wrappers for and extensions to libxml2.
void mainloop_clear_child_userdata(mainloop_child_t *child)
Definition: mainloop.c:890
void mainloop_timer_stop(mainloop_timer_t *t)
Definition: mainloop.c:1191
void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition: mainloop.c:1093
unsigned int crm_ipc_default_buffer_size(void)
Definition: ipc.c:68
void crm_ipc_destroy(crm_ipc_t *client)
Definition: ipc.c:884
void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition: mainloop.c:1127
bool crm_ipc_connected(crm_ipc_t *client)
Definition: ipc.c:921
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:226
#define crm_err(fmt, args...)
Definition: logging.h:248
struct mainloop_timer_s mainloop
Definition: mainloop.c:1142
crm_ipc_t * crm_ipc_new(const char *name, size_t max_size)
Definition: ipc.c:771
#define CRM_ASSERT(expr)
Definition: error.h:35
char data[0]
Definition: internal.h:58
void mainloop_cleanup(void)
Definition: mainloop.c:407
gboolean mainloop_add_signal(int sig, void(*dispatch)(int sig))
Definition: mainloop.c:323
int mainloop_child_kill(pid_t pid)
Definition: mainloop.c:1038
gboolean mainloop_destroy_trigger(crm_trigger_t *source)
Definition: mainloop.c:231
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
void crm_ipc_close(crm_ipc_t *client)
Definition: ipc.c:869
GList * GListPtr
Definition: crm.h:190
#define crm_info(fmt, args...)
Definition: logging.h:251
int(* dispatch)(const char *buffer, ssize_t length, gpointer userdata)
Definition: mainloop.h:73
enum crm_ais_msg_types type
Definition: internal.h:51
#define int32_t
Definition: stdint.in.h:157