Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F7170502
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
26 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/library/rating_server.php b/library/rating_server.php
index e16ad17..62ca12c 100644
--- a/library/rating_server.php
+++ b/library/rating_server.php
@@ -1,760 +1,760 @@
<?php
if (function_exists('pcntl_async_signals')) {
pcntl_async_signals(true);
} else {
declare(ticks = 1);
}
function signalHandler($sig)
{
$log = sprintf("Received signal %s", $sig);
logger($log);
switch ($sig) {
case SIGTERM:
logger("Rating Engine is exiting ...");
exit(0);
case SIGKILL:
logger("Rating Engine is exiting ...");
exit(0);
case SIGUSR1:
break;
default:
// handle all other signals
}
}
use Monolog\Handler\StreamHandler;
use Monolog\Formatter\LineFormatter;
class Daemon
{
public $pidFile;
public function __construct($pidFile = false)
{
$this->pidFile = $pidFile;
}
public function start()
{
global $logger;
$console_log = new StreamHandler('php://stdout');
$formatter = new LineFormatter("[%level_name%] %channel%: %message% %extra%", null, true, true);
$console_log->setFormatter($formatter);
$logger->pushHandler($console_log);
if ($this->pidFile !== false && file_exists($this->pidFile)) {
$pf = fopen($this->pidFile, 'r');
if (!$pf) {
$log = sprintf("Error: Unable to read pidfile %s\n", $this->pidFile);
critical($log);
exit(-1);
}
$c = fgets($pf);
fclose($pf);
if ($c === false) {
$log = sprintf("Error: Unable to read pidfile %s\n", $this->pidFile);
critical($log);
exit(-1);
}
$pid = intval($c);
if (posix_kill($pid, 0) === true) {
$log = sprintf("Error: Another process is already running on pid %d\n", $pid);
critical($log);
exit(-1);
}
}
// do the Unix double fork magic
$pid = pcntl_fork();
if ($pid == -1) {
$log = sprintf("Error: Couldn't fork!\n");
critical($log);
exit(-1);
}
if ($pid > 0) {
// this is the parent. nothing to do
exit(0);
}
// decouple from the parent environment
chdir('/');
if (posix_setsid() == -1) {
$log=sprintf("Error: Could not detach from terminal\n");
critical($log);
exit(-1);
}
umask(022);
// and now the second fork
$pid = pcntl_fork();
if ($pid == -1) {
$log = sprintf("Error: Couldn't fork!\n");
critical($log);
exit(-1);
}
if ($pid > 0) {
// this is the parent. nothing to do
exit(0);
}
// this doesn't really work. it seems php is unable to close these
//fclose(STDIN);
//fclose(STDOUT);
//fclose(STDERR);
if ($this->pidFile) {
$pf = fopen($this->pidFile, 'w');
if ($pf === false) {
$log = sprintf("Error: Unable to write pidfile %s\n", $this->pidFile);
critical($log);
exit(-1);
}
fwrite($pf, sprintf("%d\n", posix_getpid()));
fclose($pf);
register_shutdown_function(array(&$this, "removePid"));
}
//pcntl_signal(SIGCHLD, array(&$this, 'signalHandler'));
//pcntl_signal(SIGTERM, array(&$this, 'signalHandler'));
//pcntl_signal(SIGKILL, array(&$this, 'signalHandler'));
// for some reason these interfere badly with socket_select()
pcntl_signal(SIGTERM, "signalHandler", true);
// pcntl_signal(SIGKILL, "signalHandler", true);
pcntl_signal(SIGUSR1, "signalHandler", true);
logger("Rating Engine moved to the background\n");
$logger->popHandler();
}
private function removePid()
{
if (file_exists($this->pidFile)) unlink($this->pidFile);
}
}
/*
phpSocketDaemon 1.0
Copyright (C) 2006 Chris Chabot <chabotc@xs4all.nl>
See http://www.chabotc.nl/ for more information
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
class SocketClient extends SocketCDR
{
public $connecting = false;
public $disconnected = false;
public $read_buffer = '';
public $write_buffer = '';
public function connect($remote_address, $remote_port)
{
$this->connecting = true;
try {
parent::connect($remote_address, $remote_port);
} catch (SocketException $e) {
echo "Caught exception: ".$e->getMessage()."\n";
}
}
public function write($buffer, $length = 4096)
{
$this->write_buffer .= $buffer;
$this->doWrite();
}
public function doWrite()
{
$length = strlen($this->write_buffer);
try {
$written = parent::write($this->write_buffer, $length);
if ($written < $length) {
$this->write_buffer = substr($this->write_buffer, $written);
} else {
$this->write_buffer = '';
}
$this->onWrite();
return true;
} catch (SocketException $e) {
$old_socket = ($this->socket instanceof \Socket ? spl_object_id($this->socket) : (int)$this->socket);
$this->close();
$this->socket = $old_socket;
$this->disconnected = true;
$this->onDisconnect();
return false;
}
return false;
}
public function read($length = 4096)
{
try {
$this->read_buffer .= parent::read($length);
$this->onRead();
} catch (SocketException $e) {
$old_socket = ($this->socket instanceof \Socket ? spl_object_id($this->socket) : (int)$this->socket);
$this->close();
$this->socket = $old_socket;
$this->disconnected = true;
$this->onDisconnect();
}
}
public function onConnect()
{
}
public function onDisconnect()
{
}
public function onRead()
{
}
public function onWrite()
{
}
public function onTimer()
{
}
}
class SocketServer extends SocketCDR
{
public $startTime;
protected $client_class;
public function __construct($client_class, $bind_address = 0, $bind_port = 0, $domain = AF_INET, $type = SOCK_STREAM, $protocol = SOL_TCP)
{
parent::__construct($bind_address, $bind_port, $domain, $type, $protocol);
$this->client_class = $client_class;
$this->listen();
if (!$bind_address) {
$bind_address_print='0.0.0.0';
} else {
$bind_address_print=$bind_address;
}
$this->startTime=time();
$log=sprintf("Rating Engine listening on %s:%s", $bind_address_print, $bind_port);
logger($log);
}
public function accept()
{
$client = new $this->client_class(parent::accept(), $this);
if (!is_subclass_of($client, 'SocketServerClient')) {
throw new SocketException("Invalid serverClient class specified! Has to be a subclass of SocketServerClient");
}
$this->onAccept($client);
return $client;
}
// override if desired
public function onAccept(SocketServerClient $client)
{
}
}
class SocketServerClient extends SocketClient
{
public $socket;
public $remote_address;
public $remote_port;
public $local_addr;
public $local_port;
public $parentServer;
public $ratingEngine;
public $ratingEngineSettings;
public function __construct($socket, $parentServer)
{
$this->socket = $socket;
$this->parentServer = &$parentServer;
if (!is_resource($this->socket) && !($this->socket instanceof \Socket)) {
throw new SocketException("Invalid socket or resource");
} elseif (!socket_getsockname($this->socket, $this->local_addr, $this->local_port)) {
throw new SocketException("Could not retrieve local address & port: ".socket_strerror(socket_last_error($this->socket)));
} elseif (!socket_getpeername($this->socket, $this->remote_address, $this->remote_port)) {
throw new SocketException("Could not retrieve remote address & port: ".socket_strerror(socket_last_error($this->socket)));
}
global $RatingEngineServer, $RatingEngine;
$this->ratingEngine = & $RatingEngineServer;
$this->ratingEngineSettings = $RatingEngine;
$this->setNonBlock();
$this->onConnect();
}
}
class SocketDaemon
{
public $servers = array();
public $clients = array();
public function createServer($server_class, $client_class, $bind_address = 0, $bind_port = 0)
{
$server = new $server_class($client_class, $bind_address, $bind_port);
if (!is_subclass_of($server, 'SocketServer')) {
throw new SocketException("Invalid server class specified! Has to be a subclass of SocketServer");
}
$this->servers[$this->getSocketId($server->socket)] = $server;
return $server;
}
public function createClient($client_class, $remote_address, $remote_port, $bind_address = 0, $bind_port = 0)
{
$client = new $client_class($bind_address, $bind_port);
if (!is_subclass_of($client, 'SocketClient')) {
throw new SocketException("Invalid client class specified! Has to be a subclass of SocketClient");
}
$client->setNonBlock(true);
$client->connect($remote_address, $remote_port);
$this->clients[$this->getSocketId($client->socket)] = $client;
return $client;
}
private function getSocketId($socket)
{
return $socket instanceof \Socket ? spl_object_id($socket) : (int)$socket;
}
private function createReadSet()
{
$ret = array();
foreach ($this->clients as $socket) {
$ret[] = $socket->socket;
}
foreach ($this->servers as $socket) {
$ret[] = $socket->socket;
}
return $ret;
}
private function createWriteSet()
{
$ret = array();
foreach ($this->clients as $socket) {
if (!empty($socket->write_buffer) || $socket->connecting) {
$ret[] = $socket->socket;
}
}
foreach ($this->servers as $socket) {
if (!empty($socket->write_buffer)) {
$ret[] = $socket->socket;
}
}
return $ret;
}
private function createExceptionSet()
{
$ret = array();
foreach ($this->clients as $socket) {
$ret[] = $socket->socket;
}
foreach ($this->servers as $socket) {
$ret[] = $socket->socket;
}
return $ret;
}
private function cleanSockets()
{
foreach ($this->clients as $socket) {
if ($socket->disconnected || (!is_resource($socket) && !$socket->socket instanceof \Socket)) {
if (isset($this->clients[$this->getSocketId($socket->socket)])) {
unset($this->clients[$this->getSocketId($socket->socket)]);
}
}
}
}
private function getClass($socket)
{
if (isset($this->clients[$this->getSocketId($socket)])) {
return $this->clients[$this->getSocketId($socket)];
} elseif (isset($this->servers[$this->getSocketId($socket)])) {
return $this->servers[$this->getSocketId($socket)];
} else {
throw (new SocketException("Could not locate socket class for $socket"));
}
}
public function process()
{
// if SocketClient is in write set, and $socket->connecting === true, set connecting to false and call onConnect
$read_set = $this->createReadSet();
$write_set = $this->createWriteSet();
$exception_set = $this->createExceptionSet();
$event_time = time();
while (($events = socket_select($read_set, $write_set, $exception_set, 2)) !== false) {
if ($events > 0) {
foreach ($read_set as $socket) {
$socket = $this->getClass($socket);
if (is_subclass_of($socket, 'SocketServer')) {
$client = $socket->accept();
$this->clients[$this->getSocketId($client->socket)] = $client;
} elseif (is_subclass_of($socket, 'SocketClient')) {
// regular onRead event
$socket->read();
}
}
foreach ($write_set as $socket) {
$socket = $this->getClass($socket);
if (is_subclass_of($socket, 'SocketClient')) {
if ($socket->connecting === true) {
$socket->onConnect();
$socket->connecting = false;
}
$socket->doWrite();
}
}
foreach ($exception_set as $socket) {
$socket = $this->getClass($socket);
if (is_subclass_of($socket, 'SocketClient')) {
$socket->onDisconnect();
if (isset($this->clients[$this->getSocketId($socket->socket)])) {
unset($this->clients[$this->getSocketId($socket->socket)]);
}
}
}
}
if (time() - $event_time > 1) {
// only do this if more then a second passed, else we'd keep looping this for every bit received
foreach ($this->clients as $socket) {
$socket->onTimer();
}
$event_time = time();
}
$this->cleanSockets();
$read_set = $this->createReadSet();
$write_set = $this->createWriteSet();
$exception_set = $this->createExceptionSet();
}
}
}
class SocketException extends Exception
{
}
class SocketCDR
{
public $socket;
public $bind_address;
public $bind_port;
public $domain;
public $type;
public $protocol;
public $local_addr;
public $local_port;
public $read_buffer = '';
public $write_buffer = '';
public $remote_address = null;
public $remote_port = null;
private function getSocketId($socket)
{
return $socket instanceof \Socket ? spl_object_id($socket) : (int)$socket;
}
public function __construct($bind_address = 0, $bind_port = 0, $domain = AF_INET, $type = SOCK_STREAM, $protocol = SOL_TCP)
{
$this->bind_address = $bind_address;
$this->bind_port = $bind_port;
$this->domain = $domain;
$this->type = $type;
$this->protocol = $protocol;
if (($this->socket = @socket_create($domain, $type, $protocol)) === false) {
throw new SocketException("Could not create socket: ".socket_strerror(socket_last_error($this->socket)));
}
if (!@socket_set_option($this->socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
throw new SocketException("Could not set SO_REUSEADDR: ".$this->getError());
}
if (!@socket_bind($this->socket, $bind_address, $bind_port)) {
throw new SocketException("Could not bind socket to [$bind_address - $bind_port]: ".socket_strerror(socket_last_error($this->socket)));
}
if (!@socket_getsockname($this->socket, $this->local_addr, $this->local_port)) {
throw new SocketException("Could not retrieve local address & port: ".socket_strerror(socket_last_error($this->socket)));
}
$this->setNonBlock(true);
}
public function __destruct()
{
if (is_resource($this->socket) || $this->socket instanceof \Socket) {
$this->close();
}
}
public function getError()
{
$error = socket_strerror(socket_last_error($this->socket));
socket_clear_error($this->socket);
return $error;
}
public function close()
{
if (is_resource($this->socket) || $this->socket instanceof \Socket) {
@socket_shutdown($this->socket, 2);
@socket_close($this->socket);
}
$this->socket = $this->getSocketId($this->socket);
}
public function write($buffer, $length = 4096)
{
if (!is_resource($this->socket) && !($this->socket instanceof \Socket)) {
throw new SocketException("Invalid socket or resource");
} elseif (($ret = @socket_write($this->socket, $buffer, $length)) === false) {
throw new SocketException("Could not write to socket: ".$this->getError());
}
return $ret;
}
public function read($length = 4096)
{
if (!is_resource($this->socket) && !($this->socket instanceof \Socket)) {
throw new SocketException("Invalid socket or resource");
} elseif (($ret = @socket_read($this->socket, $length, PHP_BINARY_READ)) == false) {
throw new SocketException("Could not read from socket: ".$this->getError());
}
return $ret;
}
public function connect($remote_address, $remote_port)
{
$this->remote_address = $remote_address;
$this->remote_port = $remote_port;
if (!is_resource($this->socket) && !($this->socket instanceof \Socket)) {
throw new SocketException("Invalid socket or resource");
} elseif (!@socket_connect($this->socket, $remote_address, $remote_port)) {
throw new SocketException("Could not connect to {$remote_address} - {$remote_port}: ".$this->getError());
}
}
public function listen($backlog = 128)
{
if (!is_resource($this->socket) && !($this->socket instanceof \Socket)) {
throw new SocketException("Invalid socket or resource");
} elseif (!@socket_listen($this->socket, $backlog)) {
throw new SocketException("Could not listen to {$this->bind_address} - {$this->bind_port}: ".$this->getError());
}
}
public function accept()
{
if (!is_resource($this->socket) && !($this->socket instanceof \Socket)) {
throw new SocketException("Invalid socket or resource");
} elseif (($client = socket_accept($this->socket)) === false) {
throw new SocketException("Could not accept connection to {$this->bind_address} - {$this->bind_port}: ".$this->getError());
}
return $client;
}
public function setNonBlock()
{
if (!is_resource($this->socket) && !($this->socket instanceof \Socket)) {
throw new SocketException("Invalid socket or resource");
} elseif (!@socket_set_nonblock($this->socket)) {
throw new SocketException("Could not set socket non_block: ".$this->getError());
}
}
public function setBlock()
{
if (!is_resource($this->socket) && !($this->socket instanceof \Socket)) {
throw new SocketException("Invalid socket or resource");
} elseif (!@socket_set_block($this->socket)) {
throw new SocketException("Could not set socket non_block: ".$this->getError());
}
}
public function setRecieveTimeout($sec, $usec)
{
if (!is_resource($this->socket) && !($this->socket instanceof \Socket)) {
throw new SocketException("Invalid socket or resource");
} elseif (!@socket_set_option($this->socket, SOL_SOCKET, SO_RCVTIMEO, array("sec" => $sec, "usec" => $usec))) {
throw new SocketException("Could not set socket recieve timeout: ".$this->getError());
}
}
public function setReuseAddress($reuse = true)
{
$reuse = $reuse ? 1 : 0;
if (!is_resource($this->socket) && !($this->socket instanceof \Socket)) {
throw new SocketException("Invalid socket or resource");
} elseif (!@socket_set_option($this->socket, SOL_SOCKET, SO_REUSEADDR, $reuse)) {
throw new SocketException("Could not set SO_REUSEADDR to '$reuse': ".$this->getError());
}
}
}
class RatingEngineServer extends SocketServer
{
public $requests = array();
public $connected_clients = array();
}
class RatingEngineClient extends SocketServerClient
{
private function handleRequest($request)
{
$this->parentServer->requests[$this->remote_address]++;
$b = microtime(true);
$output = $this->ratingEngine->processNetworkInput($request, $this->remote_address);
$output .= "\n\n";
if ($this->ratingEngineSettings['log_delay']) {
$e=microtime(true);
$d=$e-$b;
if ($d >= $this->ratingEngineSettings['log_delay']) {
$log = sprintf("%s request took %.4f seconds", $this->ratingEngine->method, $d);
logger($log);
}
}
// $log = sprintf("Output %s", $output);
// logger($log);
return $output;
}
public function onRead()
{
$tinput = trim($this->read_buffer);
if ($tinput == 'exit' || $tinput =='quit') {
$this->onDisconnect();
$this->close();
} elseif (strtolower($tinput) == 'showclients') {
$output = '';
$j = 0;
$uptime=time()-$this->parentServer->startTime;
if (count($this->parentServer->connected_clients)) {
$output .= "\nClients:\n\n";
foreach ($this->parentServer->connected_clients as $_client) {
$j++;
$myself=$this->remote_address.":".$this->remote_port;
if ($_client == $myself) {
$output .= sprintf("%d. %s (myself)\n", $j, $_client);
} else {
$output .= sprintf("%d. %s\n", $j, $_client);
}
}
}
if (count($this->parentServer->requests)) {
$output .= "\nRequests:\n\n";
$requests=0;
foreach (array_keys($this->parentServer->requests) as $_client_ip) {
$output .= sprintf("%d requests from %s\n", $this->parentServer->requests[$_client_ip], $_client_ip);
$requests = $requests + $this->parentServer->requests[$_client_ip];
}
}
$output .= "\nStatistics:\n\n";
$output .= sprintf("Total requests: %d\n", $requests);
$output .= sprintf("Uptime: %d seconds\n", $uptime);
if ($uptime) $output .= sprintf("Load: %0.2f/s\n", $requests / $uptime);
$output .= "\n\n";
$this->write($output);
$this->read_buffer = '';
} elseif ($tinput) {
$this->write($this->handleRequest($tinput));
$this->read_buffer = '';
}
}
public function onConnect()
{
if ($this->remote_address != '127.0.0.1') {
if (is_array($this->ratingEngineSettings['allow'])) {
$allow_connection = false;
foreach ($this->ratingEngineSettings['allow'] as $_allow) {
if (preg_match("/^$_allow/", $this->remote_address)) {
- $log = sprintf("Client %s allowed by server configuration %s", $this->remote_address, $_allow);
+ $log = sprintf("Client IP %s allowed by ACL [%s]", $this->remote_address, $_allow);
logger($log);
$allow_connection = true;
break;
}
}
if (!$allow_connection) {
- $log = sprintf("Client %s disallowed by server configuration", $this->remote_address);
+ $log = sprintf("Client IP %s disallowed by ACL", $this->remote_address);
logger($log);
$this->close();
return true;
}
}
}
$_client = $this->remote_address.":".$this->remote_port;
$this->parentServer->connected_clients[] = $_client;
$this->parentServer->connected_clients = array_unique($this->parentServer->connected_clients);
- $log = sprintf("Client connection from %s:%s", $this->remote_address, $this->remote_port);
+ $log = sprintf("Client connected from %s:%s", $this->remote_address, $this->remote_port);
logger($log);
}
public function onDisconnect()
{
$new_clients = array();
foreach ($this->parentServer->connected_clients as $_client) {
$_connected_client=$this->remote_address.":".$this->remote_port;
if ($_connected_client == $_client) continue;
$new_clients[]=$_client;
}
$this->parentServer->connected_clients=array_unique($new_clients);
- $log = sprintf("Client disconnection from %s:%s", $this->remote_address, $this->remote_port);
+ $log = sprintf("Client disconnected from %s:%s", $this->remote_address, $this->remote_port);
logger($log);
}
public function onWrite()
{
}
public function onTimer()
{
}
}
?>
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Tue, Nov 26, 4:48 AM (1 d, 19 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3414150
Default Alt Text
(26 KB)
Attached To
Mode
rCDRT CDRTool
Attached
Detach File
Event Timeline
Log In to Comment