RTS API Documentation  1.10.11
switch_scheduler.c
Go to the documentation of this file.
1 /*
2  * FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
3  * Copyright (C) 2005-2014, Anthony Minessale II <anthm@freeswitch.org>
4  *
5  * Version: MPL 1.1
6  *
7  * The contents of this file are subject to the Mozilla Public License Version
8  * 1.1 (the "License"); you may not use this file except in compliance with
9  * the License. You may obtain a copy of the License at
10  * http://www.mozilla.org/MPL/
11  *
12  * Software distributed under the License is distributed on an "AS IS" basis,
13  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
14  * for the specific language governing rights and limitations under the
15  * License.
16  *
17  * The Original Code is FreeSWITCH Modular Media Switching Software Library / Soft-Switch Application
18  *
19  * The Initial Developer of the Original Code is
20  * Anthony Minessale II <anthm@freeswitch.org>
21  * Portions created by the Initial Developer are Copyright (C)
22  * the Initial Developer. All Rights Reserved.
23  *
24  * Contributor(s):
25  *
26  * Anthony Minessale II <anthm@freeswitch.org>
27  *
28  *
29  * switch_scheduler.c -- Switch Scheduler
30  *
31  */
32 
33 #include <switch.h>
34 
37  int64_t executed;
38  int in_thread;
39  int destroyed;
40  int running;
44  uint32_t flags;
45  char *desc;
47 };
49 
50 static struct {
53  uint32_t task_id;
57 } globals = { 0 };
58 
60 {
61  switch_event_t *event;
62  //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Executing task %u %s (%s)\n", tp->task.task_id, tp->desc, switch_str_nil(tp->task.group));
63 
64  tp->func(&tp->task);
65 
66  switch_mutex_lock(globals.task_mutex);
67  if (tp->task.repeat) {
68  tp->task.runtime = switch_epoch_time_now(NULL) + tp->task.repeat;
69  }
70 
71  if (!tp->destroy_requested && tp->task.runtime > tp->executed) {
72  tp->executed = 0;
74  switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-ID", "%u", tp->task.task_id);
78  switch_queue_push(globals.event_queue, event);
79  event = NULL;
80  }
81  } else {
82  tp->destroyed = 1;
83  }
84  switch_mutex_unlock(globals.task_mutex);
85 }
86 
88 {
91 
92  pool = tp->pool;
93  tp->pool = NULL;
94 
97  tp->in_thread = 0;
98 
99  return NULL;
100 }
101 
102 static int task_thread_loop(int done)
103 {
104  switch_scheduler_task_container_t *tofree, *tp, *last = NULL;
105 
106 
107  switch_mutex_lock(globals.task_mutex);
108 
109  for (tp = globals.task_list; tp; tp = tp->next) {
110  if (done) {
111  tp->destroyed = 1;
112  } else if (!tp->destroyed) {
113  int64_t now = switch_epoch_time_now(NULL);
114  if (now >= tp->task.runtime && !tp->in_thread) {
115  int32_t diff = (int32_t) (now - tp->task.runtime);
116  if (diff > 1) {
117  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Task was executed late by %d seconds %u %s (%s)\n",
118  diff, tp->task.task_id, tp->desc, switch_str_nil(tp->task.group));
119  }
120  tp->executed = now;
123  switch_threadattr_t *thd_attr;
125  switch_threadattr_create(&thd_attr, tp->pool);
126  switch_threadattr_detach_set(thd_attr, 1);
127  tp->in_thread = 1;
128  switch_thread_create(&thread, thd_attr, task_own_thread, tp, tp->pool);
129  } else {
130  tp->running = 1;
131  switch_mutex_unlock(globals.task_mutex);
133  switch_mutex_lock(globals.task_mutex);
134  tp->running = 0;
135  }
136  }
137  }
138  }
139  switch_mutex_unlock(globals.task_mutex);
140  switch_mutex_lock(globals.task_mutex);
141  for (tp = globals.task_list; tp;) {
142  if (tp->destroyed && !tp->in_thread) {
143  switch_event_t *event;
144 
145  tofree = tp;
146  tp = tp->next;
147  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Deleting task %u %s (%s)\n",
148  tofree->task.task_id, tofree->desc, switch_str_nil(tofree->task.group));
149 
150 
152  switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-ID", "%u", tofree->task.task_id);
153  switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Desc", tofree->desc);
155  switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-Runtime", "%" SWITCH_INT64_T_FMT, tofree->task.runtime);
156  switch_queue_push(globals.event_queue, event);
157  event = NULL;
158  }
159 
160  if (last) {
161  last->next = tofree->next;
162  } else {
163  globals.task_list = tofree->next;
164  }
165  switch_safe_free(tofree->task.group);
166  if (tofree->task.cmd_arg && switch_test_flag(tofree, SSHF_FREE_ARG)) {
167  free(tofree->task.cmd_arg);
168  }
169  switch_safe_free(tofree->desc);
170  free(tofree);
171  } else {
172  last = tp;
173  tp = tp->next;
174  }
175  }
176  switch_mutex_unlock(globals.task_mutex);
177 
178  return done;
179 }
180 
182 {
183  void *pop;
184  globals.task_thread_running = 1;
185 
186  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Starting task thread\n");
187  while (globals.task_thread_running == 1) {
188  if (task_thread_loop(0)) {
189  break;
190  }
191  if (switch_queue_pop_timeout(globals.event_queue, &pop, 500000) == SWITCH_STATUS_SUCCESS) {
192  switch_event_t *event = (switch_event_t *) pop;
193  switch_event_fire(&event);
194  }
195  }
196 
197  task_thread_loop(1);
198 
199  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "Task thread ending\n");
200 
201  while(switch_queue_trypop(globals.event_queue, &pop) == SWITCH_STATUS_SUCCESS) {
202  switch_event_t *event = (switch_event_t *) pop;
203  switch_event_destroy(&event);
204  }
205 
206  globals.task_thread_running = 0;
207 
208  return NULL;
209 }
210 
211 SWITCH_DECLARE(uint32_t) switch_scheduler_add_task(time_t task_runtime,
213  const char *desc, const char *group, uint32_t cmd_id, void *cmd_arg, switch_scheduler_flag_t flags)
214 {
215  uint32_t task_id;
216 
217  switch_scheduler_add_task_ex(task_runtime, func, desc, group, cmd_id, cmd_arg, flags, &task_id);
218 
219  return task_id;
220 }
221 
222 SWITCH_DECLARE(uint32_t) switch_scheduler_add_task_ex(time_t task_runtime,
224  const char *desc, const char *group, uint32_t cmd_id, void *cmd_arg, switch_scheduler_flag_t flags, uint32_t *task_id)
225 {
226  uint32_t result;
227  switch_scheduler_task_container_t *container, *tp;
228  switch_event_t *event;
230  switch_ssize_t hlen = -1;
231 
232  switch_mutex_lock(globals.task_mutex);
233  switch_zmalloc(container, sizeof(*container));
234  switch_assert(func);
235  switch_assert(task_id);
236 
237  if (task_runtime < now) {
238  container->task.repeat = (uint32_t)task_runtime;
239  task_runtime += now;
240  }
241 
242  container->func = func;
243  container->task.created = now;
244  container->task.runtime = task_runtime;
245  container->task.group = strdup(group ? group : "none");
246  container->task.cmd_id = cmd_id;
247  container->task.cmd_arg = cmd_arg;
248  container->flags = flags;
249  container->desc = strdup(desc ? desc : "none");
250  container->task.hash = switch_ci_hashfunc_default(container->task.group, &hlen);
251 
252  for (tp = globals.task_list; tp && tp->next; tp = tp->next);
253 
254  if (tp) {
255  tp->next = container;
256  } else {
257  globals.task_list = container;
258  }
259 
260  for (container->task.task_id = 0; !container->task.task_id; container->task.task_id = ++globals.task_id);
261 
262  tp = container;
263  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Added task %u %s (%s) to run at %" SWITCH_INT64_T_FMT "\n",
264  tp->task.task_id, tp->desc, switch_str_nil(tp->task.group), tp->task.runtime);
265 
267  switch_event_add_header(event, SWITCH_STACK_BOTTOM, "Task-ID", "%u", tp->task.task_id);
268  switch_event_add_header_string(event, SWITCH_STACK_BOTTOM, "Task-Desc", tp->desc);
271  switch_queue_push(globals.event_queue, event);
272  event = NULL;
273  }
274 
275  result = *task_id = container->task.task_id;
276 
277  switch_mutex_unlock(globals.task_mutex);
278 
279  return result;
280 }
281 
283 {
285  uint32_t delcnt = 0;
286 
287  switch_mutex_lock(globals.task_mutex);
288  for (tp = globals.task_list; tp; tp = tp->next) {
289  if (tp->task.task_id == task_id) {
290  if (switch_test_flag(tp, SSHF_NO_DEL)) {
291  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Attempt made to delete undeletable task #%u (group %s)\n",
292  tp->task.task_id, tp->task.group);
293  break;
294  }
295 
296  if (tp->running) {
297  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Attempt made to delete running task #%u (group %s)\n",
298  tp->task.task_id, tp->task.group);
299  tp->destroy_requested++;
300  } else {
301  tp->destroyed++;
302  }
303 
304  delcnt++;
305  break;
306  }
307  }
308  switch_mutex_unlock(globals.task_mutex);
309 
310  return delcnt;
311 }
312 
314 {
316  uint32_t delcnt = 0;
317  switch_ssize_t hlen = -1;
318  unsigned long hash = 0;
319 
320  if (zstr(group)) {
321  return 0;
322  }
323 
324  hash = switch_ci_hashfunc_default(group, &hlen);
325 
326  switch_mutex_lock(globals.task_mutex);
327  for (tp = globals.task_list; tp; tp = tp->next) {
328  if (tp->destroyed) {
329  continue;
330  }
331  if (hash == tp->task.hash && !strcmp(tp->task.group, group)) {
332  if (switch_test_flag(tp, SSHF_NO_DEL)) {
333  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Attempt made to delete undeletable task #%u (group %s)\n",
334  tp->task.task_id, group);
335  continue;
336  }
337  if (tp->running) {
338  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Attempt made to delete running task #%u (group %s)\n",
339  tp->task.task_id, tp->task.group);
340  tp->destroy_requested++;
341  } else {
342  tp->destroyed++;
343  }
344  delcnt++;
345  }
346  }
347  switch_mutex_unlock(globals.task_mutex);
348 
349  return delcnt;
350 }
351 
353 
355 {
356 
357  switch_threadattr_t *thd_attr;
358 
359  switch_core_new_memory_pool(&globals.memory_pool);
360  switch_threadattr_create(&thd_attr, globals.memory_pool);
361  switch_mutex_init(&globals.task_mutex, SWITCH_MUTEX_NESTED, globals.memory_pool);
362  switch_queue_create(&globals.event_queue, 250000, globals.memory_pool);
363 
365 }
366 
368 {
369  switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CONSOLE, "Stopping Task Thread\n");
370  if (globals.task_thread_running == 1) {
371  int sanity = 0;
372  switch_status_t st;
373 
374  globals.task_thread_running = -1;
375 
377 
378  while (globals.task_thread_running) {
379  switch_yield(100000);
380  if (++sanity > 10) {
381  break;
382  }
383  }
384  }
385 
387 
388 }
389 
390 /* For Emacs:
391  * Local Variables:
392  * mode:c
393  * indent-tabs-mode:t
394  * tab-width:4
395  * c-basic-offset:4
396  * End:
397  * For VIM:
398  * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
399  */
#define switch_event_fire(event)
Fire an event filling in most of the arguements with obvious values.
Definition: switch_event.h:413
#define switch_core_new_memory_pool(p)
Create a new sub memory pool from the core&#39;s master pool.
Definition: switch_core.h:633
uint32_t switch_scheduler_del_task_group(const char *group)
Delete a scheduled task based on the group name.
#define SWITCH_THREAD_FUNC
#define SWITCH_CHANNEL_LOG
switch_scheduler_func_t func
static void *SWITCH_THREAD_FUNC task_own_thread(switch_thread_t *thread, void *obj)
static void switch_scheduler_execute(switch_scheduler_task_container_t *tp)
switch_status_t switch_queue_pop_timeout(switch_queue_t *queue, void **data, switch_interval_time_t timeout)
Definition: switch_apr.c:1248
void switch_scheduler_task_thread_start(void)
Start the scheduler system.
#define switch_core_destroy_memory_pool(p)
Returns a subpool back to the main pool.
Definition: switch_core.h:642
switch_status_t switch_event_add_header(switch_event_t *event, switch_stack_t stack, const char *header_name, const char *fmt,...) PRINTF_FUNCTION(4
Add a header to an event.
Representation of an event.
Definition: switch_event.h:80
switch_status_t switch_queue_trypop(switch_queue_t *queue, void **data)
Definition: switch_apr.c:1264
unsigned int switch_ci_hashfunc_default(const char *char_key, switch_ssize_t *klen)
Definition: switch_apr.c:121
void switch_scheduler_task_thread_stop(void)
Stop the scheduler system.
uint32_t switch_scheduler_flag_t
Definition: switch_types.h:497
static switch_thread_t * thread
Definition: switch_log.c:486
switch_hash_t * hash
Definition: switch_event.c:76
uint32_t switch_scheduler_add_task(time_t task_runtime, switch_scheduler_func_t func, const char *desc, const char *group, uint32_t cmd_id, void *cmd_arg, switch_scheduler_flag_t flags)
Schedule a task in the future.
#define zstr(x)
Definition: switch_utils.h:314
switch_status_t switch_mutex_unlock(switch_mutex_t *lock)
Definition: switch_apr.c:313
#define SWITCH_MUTEX_NESTED
Definition: switch_apr.h:318
switch_status_t switch_threadattr_detach_set(switch_threadattr_t *attr, int32_t on)
Definition: switch_apr.c:678
int64_t switch_time_t
Definition: switch_apr.h:188
#define switch_yield(ms)
Wait a desired number of microseconds and yield the CPU.
Definition: switch_utils.h:998
int task_thread_running
switch_status_t switch_mutex_lock(switch_mutex_t *lock)
Definition: switch_apr.c:308
intptr_t switch_ssize_t
uint32_t switch_scheduler_del_task_id(uint32_t task_id)
Delete a scheduled task.
switch_status_t switch_event_add_header_string(switch_event_t *event, switch_stack_t stack, const char *header_name, const char *data)
Add a string header to an event.
#define switch_zmalloc(ptr, len)
#define switch_safe_free(it)
Free a pointer and set it to NULL unless it already is NULL.
Definition: switch_utils.h:885
switch_status_t switch_mutex_init(switch_mutex_t **lock, unsigned int flags, switch_memory_pool_t *pool)
Definition: switch_apr.c:293
switch_status_t switch_thread_join(switch_status_t *retval, switch_thread_t *thd)
Definition: switch_apr.c:1379
switch_memory_pool_t * memory_pool
#define switch_str_nil(s)
Make a null string a blank string instead.
Definition: switch_utils.h:993
struct fspr_thread_mutex_t switch_mutex_t
Definition: switch_apr.h:314
uint32_t switch_scheduler_add_task_ex(time_t task_runtime, switch_scheduler_func_t func, const char *desc, const char *group, uint32_t cmd_id, void *cmd_arg, switch_scheduler_flag_t flags, uint32_t *task_id)
Schedule a task in the future.
struct switch_scheduler_task_container * next
switch_thread_t * task_thread_p
switch_status_t
Common return values.
switch_queue_t * event_queue
void(* switch_scheduler_func_t)(switch_scheduler_task_t *task)
Main Library Header.
#define switch_event_create(event, id)
Create a new event assuming it will not be custom event and therefore hiding the unused parameters...
Definition: switch_event.h:384
#define SWITCH_INT64_T_FMT
#define SWITCH_DECLARE(type)
static void *SWITCH_THREAD_FUNC switch_scheduler_task_thread(switch_thread_t *thread, void *obj)
uint32_t task_id
time_t switch_epoch_time_now(time_t *t)
Get the current epoch time.
Definition: switch_time.c:322
static struct @8 globals
switch_status_t switch_queue_push(switch_queue_t *queue, void *data)
Definition: switch_apr.c:1253
#define switch_test_flag(obj, flag)
Test for the existance of a flag on an arbitary object.
Definition: switch_utils.h:693
void switch_log_printf(_In_ switch_text_channel_t channel, _In_z_ const char *file, _In_z_ const char *func, _In_ int line, _In_opt_z_ const char *userdata, _In_ switch_log_level_t level, _In_z_ _Printf_format_string_ const char *fmt,...) PRINTF_FUNCTION(7
Write log data to the logging engine.
switch_status_t switch_threadattr_create(switch_threadattr_t **new_attr, switch_memory_pool_t *pool)
Definition: switch_apr.c:665
switch_status_t switch_thread_create(switch_thread_t **new_thread, switch_threadattr_t *attr, switch_thread_start_t func, void *data, switch_memory_pool_t *cont)
Definition: switch_apr.c:698
struct fspr_pool_t switch_memory_pool_t
switch_memory_pool_t * pool
switch_status_t switch_queue_create(switch_queue_t **queue, unsigned int queue_capacity, switch_memory_pool_t *pool)
Definition: switch_apr.c:1233
void switch_event_destroy(switch_event_t **event)
Destroy an event.
#define switch_assert(expr)
struct fspr_thread_t switch_thread_t
Definition: switch_apr.h:941
static int task_thread_loop(int done)
switch_mutex_t * task_mutex
switch_scheduler_task_container_t * task_list
switch_scheduler_task_t task