/* GSL Engine - Flow module operation engine * Copyright (C) 2001 Tim Janik * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General * Public License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, * Boston, MA 02110-1301, USA. */ #include "gsloputil.h" #include "gslcommon.h" #include "gslopnode.h" #include "gslopschedule.h" #include "gslsignal.h" #include #include #include #include #include /* --- UserThread --- */ GslOStream* _engine_alloc_ostreams (guint n) { if (n) { guint i = sizeof (GslOStream) * n + sizeof (gfloat) * gsl_engine_block_size () * n; GslOStream *streams = gsl_alloc_memblock0 (i); gfloat *buffers = (gfloat*) (streams + n); for (i = 0; i < n; i++) { streams[i].values = buffers; buffers += gsl_engine_block_size (); } return streams; } else return NULL; } static void free_node (EngineNode *node) { guint j; g_return_if_fail (node != NULL); g_return_if_fail (node->output_nodes == NULL); g_return_if_fail (node->integrated == FALSE); g_return_if_fail (node->sched_tag == FALSE); g_return_if_fail (node->sched_router_tag == FALSE); if (node->module.klass->free) node->module.klass->free (node->module.user_data, node->module.klass); gsl_rec_mutex_destroy (&node->rec_mutex); if (node->module.ostreams) { guint n = ENGINE_NODE_N_OSTREAMS (node); guint i = sizeof (GslOStream) * n + sizeof (gfloat) * gsl_engine_block_size () * n; gsl_free_memblock (i, node->module.ostreams); gsl_delete_structs (EngineOutput, ENGINE_NODE_N_OSTREAMS (node), node->outputs); } if (node->module.istreams) { gsl_delete_structs (GslIStream, ENGINE_NODE_N_ISTREAMS (node), node->module.istreams); gsl_delete_structs (EngineInput, ENGINE_NODE_N_ISTREAMS (node), node->inputs); } for (j = 0; j < ENGINE_NODE_N_JSTREAMS (node); j++) { g_free (node->jinputs[j]); g_free (node->module.jstreams[j].values); } if (node->module.jstreams) { gsl_delete_structs (GslJStream, ENGINE_NODE_N_JSTREAMS (node), node->module.jstreams); gsl_delete_structs (EngineJInput*, ENGINE_NODE_N_JSTREAMS (node), node->jinputs); } gsl_delete_struct (EngineNode, node); } static void free_job (GslJob *job) { g_return_if_fail (job != NULL); switch (job->job_id) { case ENGINE_JOB_ACCESS: if (job->data.access.free_func) job->data.access.free_func (job->data.access.data); break; case ENGINE_JOB_DEBUG: g_free (job->data.debug); break; case ENGINE_JOB_ADD_POLL: case ENGINE_JOB_REMOVE_POLL: g_free (job->data.poll.fds); if (job->data.poll.free_func) job->data.poll.free_func (job->data.poll.data); break; case ENGINE_JOB_DISCARD: free_node (job->data.node); break; default: ; } gsl_delete_struct (GslJob, job); } static void free_flow_job (EngineFlowJob *fjob) { switch (fjob->fjob_id) { case ENGINE_FLOW_JOB_SUSPEND: case ENGINE_FLOW_JOB_RESUME: gsl_delete_struct (EngineFlowJobAny, &fjob->any); break; case ENGINE_FLOW_JOB_ACCESS: if (fjob->access.free_func) fjob->access.free_func (fjob->access.data); gsl_delete_struct (EngineFlowJobAccess, &fjob->access); break; default: g_assert_not_reached (); } } void _engine_free_trans (GslTrans *trans) { GslJob *job; g_return_if_fail (trans != NULL); g_return_if_fail (trans->comitted == FALSE); if (trans->jobs_tail) g_return_if_fail (trans->jobs_tail->next == NULL); /* paranoid */ job = trans->jobs_head; while (job) { GslJob *tmp = job->next; free_job (job); job = tmp; } gsl_delete_struct (GslTrans, trans); } /* -- master node list --- */ static EngineNode *master_node_list_head = NULL; static EngineNode *master_node_list_tail = NULL; EngineNode* _engine_mnl_head (void) { return master_node_list_head; } void _engine_mnl_remove (EngineNode *node) { g_return_if_fail (node->integrated == TRUE); node->integrated = FALSE; /* remove */ if (node->mnl_prev) node->mnl_prev->mnl_next = node->mnl_next; else master_node_list_head = node->mnl_next; if (node->mnl_next) node->mnl_next->mnl_prev = node->mnl_prev; else master_node_list_tail = node->mnl_prev; node->mnl_prev = NULL; node->mnl_next = NULL; } void _engine_mnl_integrate (EngineNode *node) { g_return_if_fail (node->integrated == FALSE); g_return_if_fail (node->flow_jobs == NULL); node->integrated = TRUE; /* append */ if (master_node_list_tail) master_node_list_tail->mnl_next = node; node->mnl_prev = master_node_list_tail; master_node_list_tail = node; if (!master_node_list_head) master_node_list_head = master_node_list_tail; g_assert (node->mnl_next == NULL); } void _engine_mnl_reorder (EngineNode *node) { EngineNode *sibling; g_return_if_fail (node->integrated == TRUE); /* the master node list is partially sorted. that is, all * nodes which are not scheduled and have pending flow_jobs * are agglomerated at the head. */ sibling = node->mnl_prev ? node->mnl_prev : node->mnl_next; if (sibling && GSL_MNL_HEAD_NODE (node) != GSL_MNL_HEAD_NODE (sibling)) { /* remove */ if (node->mnl_prev) node->mnl_prev->mnl_next = node->mnl_next; else master_node_list_head = node->mnl_next; if (node->mnl_next) node->mnl_next->mnl_prev = node->mnl_prev; else master_node_list_tail = node->mnl_prev; if (GSL_MNL_HEAD_NODE (node)) /* move towards head */ { /* prepend to non-NULL list */ master_node_list_head->mnl_prev = node; node->mnl_next = master_node_list_head; master_node_list_head = node; node->mnl_prev = NULL; } else /* move towards tail */ { /* append to non-NULL list */ master_node_list_tail->mnl_next = node; node->mnl_prev = master_node_list_tail; master_node_list_tail = node; node->mnl_next = NULL; } } } /* --- const value blocks --- */ typedef struct { guint n_nodes; gfloat **nodes; guint8 *nodes_used; } ConstValuesArray; static const guint8 CONST_VALUES_EXPIRE = 16; /* expire value after being unused for 16 times */ static inline gfloat** const_values_lookup_nextmost (ConstValuesArray *array, gfloat key_value) { guint n_nodes = array->n_nodes; if (n_nodes > 0) { gfloat **nodes = array->nodes; gfloat **check; nodes -= 1; do { guint i; gfloat cmp; i = (n_nodes + 1) >> 1; check = nodes + i; cmp = key_value - **check; if (cmp > GSL_SIGNAL_EPSILON) { n_nodes -= i; nodes = check; } else if (cmp < -GSL_SIGNAL_EPSILON) n_nodes = i - 1; else /* cmp ~==~ 0.0 */ return check; /* matched */ } while (n_nodes); return check; /* nextmost */ } return NULL; } static inline guint upper_power2 (guint number) { return gsl_alloc_upper_power2 (MAX (number, 8)); } static inline void const_values_insert (ConstValuesArray *array, guint index, gfloat *value_block) { if (array->n_nodes == 0) { guint new_size = upper_power2 (sizeof (gfloat*)); array->nodes = g_realloc (array->nodes, new_size); array->nodes_used = g_realloc (array->nodes_used, new_size / sizeof (gfloat*)); array->n_nodes = 1; g_assert (index == 0); } else { guint n_nodes = array->n_nodes++; if (*array->nodes[index] < *value_block) index++; if (1) { guint new_size = upper_power2 (array->n_nodes * sizeof (gfloat*)); guint old_size = upper_power2 (n_nodes * sizeof (gfloat*)); if (new_size != old_size) { array->nodes = g_realloc (array->nodes, new_size); array->nodes_used = g_realloc (array->nodes_used, new_size / sizeof(gfloat*)); } } g_memmove (array->nodes + index + 1, array->nodes + index, (n_nodes - index) * sizeof (array->nodes[0])); g_memmove (array->nodes_used + index + 1, array->nodes_used + index, (n_nodes - index) * sizeof (array->nodes_used[0])); } array->nodes[index] = value_block; array->nodes_used[index] = CONST_VALUES_EXPIRE; } static ConstValuesArray cvalue_array = { 0, NULL, NULL }; gfloat* gsl_engine_const_values (gfloat value) { extern const gfloat gsl_engine_master_zero_block[]; gfloat **block; if (fabs (value) < GSL_SIGNAL_EPSILON) return (gfloat*) gsl_engine_master_zero_block; block = const_values_lookup_nextmost (&cvalue_array, value); /* found correct match? */ if (block && fabs (**block - value) < GSL_SIGNAL_EPSILON) { cvalue_array.nodes_used[block - cvalue_array.nodes] = CONST_VALUES_EXPIRE; return *block; } else { /* create new value block */ gfloat *values = g_new (gfloat, gsl_engine_block_size ()); guint i; for (i = 0; i < gsl_engine_block_size (); i++) values[i] = value; if (block) const_values_insert (&cvalue_array, block - cvalue_array.nodes, values); else const_values_insert (&cvalue_array, 0, values); return values; } } void _engine_recycle_const_values (void) { gfloat **nodes = cvalue_array.nodes; guint8 *used = cvalue_array.nodes_used; guint count = cvalue_array.n_nodes, e = 0, i; for (i = 0; i < count; i++) { used[i]--; /* invariant: use counts are never 0 */ if (used[i] == 0) g_free (nodes[i]); else /* preserve node */ { if (e < i) { nodes[e] = nodes[i]; used[e] = used[i]; } e++; } } cvalue_array.n_nodes = e; } /* --- job transactions --- */ static GslMutex cqueue_trans = { 0, }; static GslTrans *cqueue_trans_pending_head = NULL; static GslTrans *cqueue_trans_pending_tail = NULL; static GslCond cqueue_trans_cond = { 0, }; static GslTrans *cqueue_trans_trash = NULL; static GslTrans *cqueue_trans_active_head = NULL; static GslTrans *cqueue_trans_active_tail = NULL; static EngineFlowJob *cqueue_trash_fjobs = NULL; static GslJob *cqueue_trans_job = NULL; void _engine_enqueue_trans (GslTrans *trans) { g_return_if_fail (trans != NULL); g_return_if_fail (trans->comitted == TRUE); g_return_if_fail (trans->jobs_head != NULL); g_return_if_fail (trans->cqt_next == NULL); GSL_SPIN_LOCK (&cqueue_trans); if (cqueue_trans_pending_tail) { cqueue_trans_pending_tail->cqt_next = trans; cqueue_trans_pending_tail->jobs_tail->next = trans->jobs_head; } else cqueue_trans_pending_head = trans; cqueue_trans_pending_tail = trans; GSL_SPIN_UNLOCK (&cqueue_trans); gsl_cond_signal (&cqueue_trans_cond); } void _engine_wait_on_trans (void) { GSL_SPIN_LOCK (&cqueue_trans); while (cqueue_trans_pending_head || cqueue_trans_active_head) gsl_cond_wait (&cqueue_trans_cond, &cqueue_trans); GSL_SPIN_UNLOCK (&cqueue_trans); } gboolean _engine_job_pending (void) { gboolean pending = cqueue_trans_job != NULL; if (!pending) { GSL_SPIN_LOCK (&cqueue_trans); pending = cqueue_trans_pending_head != NULL; GSL_SPIN_UNLOCK (&cqueue_trans); } return pending; } GslJob* _engine_pop_job (void) /* (glong max_useconds) */ { /* clean up if necessary and try fetching new jobs */ if (!cqueue_trans_job) { if (cqueue_trans_active_head) { GSL_SPIN_LOCK (&cqueue_trans); /* get rid of processed transaction and * signal UserThread which might be in * op_com_wait_on_trans() */ cqueue_trans_active_tail->cqt_next = cqueue_trans_trash; cqueue_trans_trash = cqueue_trans_active_head; /* fetch new transaction */ cqueue_trans_active_head = cqueue_trans_pending_head; cqueue_trans_active_tail = cqueue_trans_pending_tail; cqueue_trans_pending_head = NULL; cqueue_trans_pending_tail = NULL; GSL_SPIN_UNLOCK (&cqueue_trans); gsl_cond_signal (&cqueue_trans_cond); } else { GSL_SPIN_LOCK (&cqueue_trans); /* fetch new transaction */ cqueue_trans_active_head = cqueue_trans_pending_head; cqueue_trans_active_tail = cqueue_trans_pending_tail; cqueue_trans_pending_head = NULL; cqueue_trans_pending_tail = NULL; GSL_SPIN_UNLOCK (&cqueue_trans); } cqueue_trans_job = cqueue_trans_active_head ? cqueue_trans_active_head->jobs_head : NULL; } /* pick new job and out of here */ if (cqueue_trans_job) { GslJob *job = cqueue_trans_job; cqueue_trans_job = job->next; return job; } #if 0 /* wait until jobs are present */ if (max_useconds != 0) { GSL_SPIN_LOCK (&cqueue_trans); if (!cqueue_trans_pending_head) gsl_cond_wait_timed (&cqueue_trans_cond, &cqueue_trans, max_useconds); GSL_SPIN_UNLOCK (&cqueue_trans); /* there may be jobs now, start from scratch */ return op_com_pop_job_timed (max_useconds < 0 ? -1 : 0); } #endif /* time expired, no jobs... */ return NULL; } /* --- user thread garbage collection --- */ /** * gsl_engine_garbage_collect * * GSL Engine user thread function. Collects processed jobs * and transactions from the engine and frees them, this * involves callback invocation of GslFreeFunc() functions, * e.g. from gsl_job_access() or gsl_flow_job_access() * jobs. * This function may only be called from the user thread, * as GslFreeFunc() functions are guranteed to be executed * in the user thread. */ void gsl_engine_garbage_collect (void) { GslTrans *trans; EngineFlowJob *fjobs; GSL_SPIN_LOCK (&cqueue_trans); trans = cqueue_trans_trash; cqueue_trans_trash = NULL; fjobs = cqueue_trash_fjobs; cqueue_trash_fjobs = NULL; GSL_SPIN_UNLOCK (&cqueue_trans); while (trans) { GslTrans *t = trans; trans = t->cqt_next; t->cqt_next = NULL; t->jobs_tail->next = NULL; t->comitted = FALSE; _engine_free_trans (t); } while (fjobs) { EngineFlowJob *j = fjobs; fjobs = j->any.next; j->any.next = NULL; free_flow_job (j); } } /* --- node processing queue --- */ static GslMutex pqueue_mutex = { 0, }; static EngineSchedule *pqueue_schedule = NULL; static guint pqueue_n_nodes = 0; static guint pqueue_n_cycles = 0; static GslCond pqueue_done_cond = { 0, }; static EngineFlowJob *pqueue_trash_fjobs_first = NULL; static EngineFlowJob *pqueue_trash_fjobs_last = NULL; void _engine_set_schedule (EngineSchedule *sched) { g_return_if_fail (sched != NULL); g_return_if_fail (sched->secured == TRUE); GSL_SPIN_LOCK (&pqueue_mutex); if_reject (pqueue_schedule) { GSL_SPIN_UNLOCK (&pqueue_mutex); g_warning (G_STRLOC ": schedule already set"); return; } pqueue_schedule = sched; sched->in_pqueue = TRUE; GSL_SPIN_UNLOCK (&pqueue_mutex); } void _engine_unset_schedule (EngineSchedule *sched) { EngineFlowJob *trash_fjobs_first, *trash_fjobs_last; g_return_if_fail (sched != NULL); GSL_SPIN_LOCK (&pqueue_mutex); if_reject (pqueue_schedule != sched) { GSL_SPIN_UNLOCK (&pqueue_mutex); g_warning (G_STRLOC ": schedule(%p) not currently set", sched); return; } if_reject (pqueue_n_nodes || pqueue_n_cycles) g_warning (G_STRLOC ": schedule(%p) still busy", sched); sched->in_pqueue = FALSE; pqueue_schedule = NULL; trash_fjobs_first = pqueue_trash_fjobs_first; trash_fjobs_last = pqueue_trash_fjobs_last; pqueue_trash_fjobs_first = NULL; pqueue_trash_fjobs_last = NULL; GSL_SPIN_UNLOCK (&pqueue_mutex); if (trash_fjobs_first) /* move trash flow jobs */ { GSL_SPIN_LOCK (&cqueue_trans); trash_fjobs_last->any.next = cqueue_trash_fjobs; cqueue_trash_fjobs = trash_fjobs_first; GSL_SPIN_UNLOCK (&cqueue_trans); } } EngineNode* _engine_pop_unprocessed_node (void) { EngineNode *node; GSL_SPIN_LOCK (&pqueue_mutex); node = pqueue_schedule ? _engine_schedule_pop_node (pqueue_schedule) : NULL; if (node) pqueue_n_nodes += 1; GSL_SPIN_UNLOCK (&pqueue_mutex); if (node) ENGINE_NODE_LOCK (node); return node; } void _engine_push_processed_node (EngineNode *node) { g_return_if_fail (node != NULL); g_return_if_fail (pqueue_n_nodes > 0); g_return_if_fail (ENGINE_NODE_IS_SCHEDULED (node)); GSL_SPIN_LOCK (&pqueue_mutex); g_assert (pqueue_n_nodes > 0); /* paranoid */ if (node->fjob_first) /* collect trash flow jobs */ { node->fjob_last->any.next = pqueue_trash_fjobs_first; pqueue_trash_fjobs_first = node->fjob_first; if (!pqueue_trash_fjobs_last) pqueue_trash_fjobs_last = node->fjob_last; node->fjob_first = NULL; node->fjob_last = NULL; } pqueue_n_nodes -= 1; ENGINE_NODE_UNLOCK (node); if (!pqueue_n_nodes && !pqueue_n_cycles && GSL_SCHEDULE_NONPOPABLE (pqueue_schedule)) gsl_cond_signal (&pqueue_done_cond); GSL_SPIN_UNLOCK (&pqueue_mutex); } GslRing* _engine_pop_unprocessed_cycle (void) { return NULL; } void _engine_push_processed_cycle (GslRing *cycle) { g_return_if_fail (cycle != NULL); g_return_if_fail (pqueue_n_cycles > 0); g_return_if_fail (ENGINE_NODE_IS_SCHEDULED (cycle->data)); } void _engine_wait_on_unprocessed (void) { GSL_SPIN_LOCK (&pqueue_mutex); while (pqueue_n_nodes || pqueue_n_cycles || !GSL_SCHEDULE_NONPOPABLE (pqueue_schedule)) gsl_cond_wait (&pqueue_done_cond, &pqueue_mutex); GSL_SPIN_UNLOCK (&pqueue_mutex); } /* --- initialization --- */ void _gsl_init_engine_utils (void) { static gboolean initialized = FALSE; g_assert (initialized == FALSE); /* single invocation */ initialized++; gsl_mutex_init (&cqueue_trans); gsl_cond_init (&cqueue_trans_cond); gsl_mutex_init (&pqueue_mutex); gsl_cond_init (&pqueue_done_cond); }