14 private static $queue = array();
15 private static $queueMd5 = array();
16 private static $max = 0;
17 private static $start = 0;
18 private static $execStart = 0;
19 private static $stats = [];
20 private static $executing =
false;
25 public static function size() {
26 return sizeof(Ego_Queue::$queue);
36 self::$executing = $b;
45 private static function getHash($entry) {
46 return md5(print_r($entry,
true));
72 public static function add(callable $call, $params = array(), $first =
false) {
79 if (self::$executing) {
85 $md5 = self::getHash($entry);
86 if (self::$queueMd5[$md5]) {
89 self::$queueMd5[$md5] =
true;
92 array_unshift(self::$queue, $entry);
94 array_push(self::$queue, $entry);
97 self::log(
'add', $entry);
107 public static function remove(callable $call, $params = array()) {
108 if (
sizeof(self::$queue) > 0) {
113 foreach (self::$queue as $offset => $queue) {
114 if ($queue === $entry) {
115 array_splice(self::$queue, $offset, 1);
116 unset(self::$queueMd5[self::getHash($entry)]);
128 public static function exec($close_request =
true, $flush=
true) {
129 if (
sizeof(self::$queue) <= 0) {
132 self::$executing =
true;
137 if ( $close_request && function_exists(
'fastcgi_finish_request') ) {
138 fastcgi_finish_request();
140 self::$start = microtime(
true);
142 if ($GLOBALS[
'egotec_conf'][
'monitor']) {
143 apcu_inc(
'queue_current_exec');
144 apcu_inc(
'queue_count_exec');
145 apcu_inc(
'queue_size_exec',
sizeof(self::$queue));
148 while ($entry = array_shift(self::$queue)) {
152 if ($GLOBALS[
'egotec_conf'][
'monitor']) {
153 $end = microtime(
true);
154 $d = $end - self::$start;
155 apcu_dec(
'queue_current_exec');
156 apcu_inc(
'queue_duration_exec', $d);
166 private static function call($entry) {
167 if (@is_callable($entry[
'call'])) {
168 self::log(
'execStart', $entry);
170 call_user_func_array($entry[
'call'], $entry[
'params']);
171 }
catch (Exception $e) {
173 "queue call exception\n".
174 $e->getMessage().
"\n".
175 $e->getFile().
" ".$e->getLine().
"\n".
176 $e->getTraceAsString()
179 self::log(
'execEnd', $entry);
183 private static function log($method, $entry) {
184 if (!$GLOBALS[
'egotec_conf'][
'log_queue'])
return;
186 if ($method==
'execStart') {
187 self::$execStart = microtime(
true);
190 $s =
"Ego_Queue::$method\n";
191 if (is_array($entry[
'call'])) {
193 if (is_string($entry[
'call'][0])) {
194 $name = $entry[
'call'][0].
' '.$entry[
'call'][1];
196 $class = get_class($entry[
'call'][0]);
199 $name2.= $entry[
'call'][0]->name;
201 case 'Ego_Cache_apcu':
202 case 'Ego_Cache_custom':
203 case 'Ego_Cache_file':
204 $name2.= $entry[
'call'][0]->getPath();
206 case 'Ego_Search_Lucene':
207 $name2.= $entry[
'call'][0]->getConfig()[
'table'];
209 case 'Ego_Search_Elastic':
210 $name2.= $entry[
'call'][0]->getConfig()[
'index'];
214 $name = $class.
" ".print_r($entry[
'call'][1],
true);
216 } elseif (is_object($entry[
'call'])) {
217 $name = get_class($entry[
'call']);
219 $name = $entry[
'call'];
221 $s.= $name.$name2.
"\n";
223 if ($method==
'add') {
224 self::$stats[$name][
'count']++;
227 if ($method==
'execEnd') {
228 self::$stats[$name][
'time']+= microtime(
true)-self::$execStart;
229 uasort(self::$stats,
function($a, $b) {
return $b[
'time']-$a[
'time']; } );
232 $size =
sizeof(self::$queue);
233 if ($size>self::$max) {
236 $d = self::$start ? microtime(
true) - self::$start:0;
238 $GLOBALS[
'egotec_conf'][
'log_dir'] .
'q' . getmypid(),
239 $size .
' ' . self::$max .
' ' . round($d, 3) .
' ' . $s .
"\n" .
240 print_r(self::$stats,
true)
244 register_shutdown_function(array(
'Ego_Queue',
'exec'));
static add(callable $call, $params=array(), $first=false)
static exec($close_request=true, $flush=true)