EGOCMS  24.0
EGOTEC Content-Managament-System
Ego_Queue.php
gehe zur Dokumentation dieser Datei
1 <?php
13 class Ego_Queue {
14  private static $queue = array();
15  private static $queueMd5 = array();
16  private static $max = 0;
17  private static $start = 0;
18  private static $execStart = 0;
19  private static $stats = [];
20  private static $executing = false;
21 
25  public static function size() {
26  return sizeof(Ego_Queue::$queue);
27  }
28 
35  public static function setExecuting($b) {
36  self::$executing = $b;
37  }
38 
45  private static function getHash($entry) {
46  return md5(print_r($entry, true));
47  }
48 
72  public static function add(callable $call, $params = array(), $first = false) {
73  $entry = array(
74  'call' => $call,
75  'params' => $params
76  );
77 
78  // Falls die Queue bereits abgearbeitet wird, werden alle neuen Einträge in der Queue sofort ausgeführt
79  if (self::$executing) {
80  self::call($entry);
81  return;
82  }
83 
84  // Eine mehrfache Ausführung der gleichen Methode vermeiden.
85  $md5 = self::getHash($entry);
86  if (self::$queueMd5[$md5]) {
87  return;
88  }
89  self::$queueMd5[$md5] = true;
90 
91  if ($first) {
92  array_unshift(self::$queue, $entry);
93  } else {
94  array_push(self::$queue, $entry);
95  }
96 
97  self::log('add', $entry);
98  }
99 
107  public static function remove(callable $call, $params = array()) {
108  if (sizeof(self::$queue) > 0) {
109  $entry = array(
110  'call' => $call,
111  'params' => $params
112  );
113  foreach (self::$queue as $offset => $queue) {
114  if ($queue === $entry) {
115  array_splice(self::$queue, $offset, 1);
116  unset(self::$queueMd5[self::getHash($entry)]);
117  break;
118  }
119  }
120  }
121  }
122 
128  public static function exec($close_request = true, $flush=true) {
129  if (sizeof(self::$queue) <= 0) { // Keine Queue vorhanden.
130  return;
131  }
132  self::$executing = true;
133 
134  if ($flush) {
135  Ego_System::flush(); // Zunächst alles ausgeben
136  }
137  if ( $close_request && function_exists('fastcgi_finish_request') ) {
138  fastcgi_finish_request(); // und dann die Verbindung schließen und trotzdem weitermachen.
139  }
140  self::$start = microtime(true);
141 
142  if ($GLOBALS['egotec_conf']['monitor']) { // Monitor Queue Aufrufe
143  apcu_inc('queue_current_exec'); // Anzahl aktuell laufender Queues.
144  apcu_inc('queue_count_exec'); // Anzahl Queues gesamt.
145  apcu_inc('queue_size_exec', sizeof(self::$queue)); // Anzahl Queues gesamt.
146  }
147 
148  while ($entry = array_shift(self::$queue)) {
149  self::call($entry);
150  }
151 
152  if ($GLOBALS['egotec_conf']['monitor']) { // Monitor Queue Aufrufe
153  $end = microtime(true);
154  $d = $end - self::$start;
155  apcu_dec('queue_current_exec'); // Anzahl aktuell laufender Queues.
156  apcu_inc('queue_duration_exec', $d); // Anzahl Queues gesamt.
157  }
158  }
159 
166  private static function call($entry) {
167  if (@is_callable($entry['call'])) {
168  self::log('execStart', $entry);
169  try {
170  call_user_func_array($entry['call'], $entry['params']);
171  } catch (Exception $e) {
172  egotec_error_log(
173  "queue call exception\n".
174  $e->getMessage()."\n".
175  $e->getFile()." ".$e->getLine()."\n".
176  $e->getTraceAsString()
177  );
178  }
179  self::log('execEnd', $entry);
180  }
181  }
182 
183  private static function log($method, $entry) {
184  if (!$GLOBALS['egotec_conf']['log_queue']) return;
185 
186  if ($method=='execStart') {
187  self::$execStart = microtime(true);
188  }
189 
190  $s = "Ego_Queue::$method\n";
191  if (is_array($entry['call'])) {
192  $name2 = ' ';
193  if (is_string($entry['call'][0])) {
194  $name = $entry['call'][0].' '.$entry['call'][1];
195  } else {
196  $class = get_class($entry['call'][0]);
197  switch ($class) {
198  case 'Site':
199  $name2.= $entry['call'][0]->name;
200  break;
201  case 'Ego_Cache_apcu':
202  case 'Ego_Cache_custom':
203  case 'Ego_Cache_file':
204  $name2.= $entry['call'][0]->getPath();
205  break;
206  case 'Ego_Search_Lucene':
207  $name2.= $entry['call'][0]->getConfig()['table'];
208  break;
209  case 'Ego_Search_Elastic':
210  $name2.= $entry['call'][0]->getConfig()['index'];
211  break;
212  default:
213  }
214  $name = $class." ".print_r($entry['call'][1], true);
215  }
216  } elseif (is_object($entry['call'])) {
217  $name = get_class($entry['call']);
218  } else {
219  $name = $entry['call'];
220  }
221  $s.= $name.$name2."\n";
222 
223  if ($method=='add') {
224  self::$stats[$name]['count']++;
225  }
226 
227  if ($method=='execEnd') {
228  self::$stats[$name]['time']+= microtime(true)-self::$execStart;
229  uasort(self::$stats, function($a, $b) { return $b['time']-$a['time']; } );
230  }
231 
232  $size = sizeof(self::$queue);
233  if ($size>self::$max) {
234  self::$max = $size;
235  }
236  $d = self::$start ? microtime(true) - self::$start:0;
237  file_put_contents(
238  $GLOBALS['egotec_conf']['log_dir'] . 'q' . getmypid(),
239  $size . ' ' . self::$max . ' ' . round($d, 3) . ' ' . $s . "\n" .
240  print_r(self::$stats, true)
241  );
242  }
243 }
244 register_shutdown_function(array('Ego_Queue', 'exec'));
static add(callable $call, $params=array(), $first=false)
Definition: Ego_Queue.php:72
static size()
Definition: Ego_Queue.php:25
static setExecuting($b)
Definition: Ego_Queue.php:35
static exec($close_request=true, $flush=true)
Definition: Ego_Queue.php:128
static flush($string='')
Definition: Ego_System.php:934