Skip to main content

CodeIgniter 3 Daemon Queue Worker (Consumer) Management Controller.

<?php

namespace yidas\queue\worker;

use Exception;
use CI_Controller;

/**
 * Worker Manage Controller
 * 
 * @author  Nick Tsai <myintaer@gmail.com>
 * @version 1.0.1
 * @todo    Implement by using pcntl
 */
class Controller extends CI_Controller
{
    /**
     * Debug mode
     *
     * @var boolean
     */
    public $debug = true;

    /**
     * Log file path
     *
     * @var string
     */
    public $logPath;

    /**
     * PHP CLI command for current environment
     *
     * @var string
     */
    public $phpCommand = 'php';

    /**
     * Time interval of listen frequency on idle
     *
     * @var integer Seconds
     */
    public $listenerSleep = 3;

    /**
     * Time interval of worker processes frequency
     * 
     * The time between a job handle done and the next job catch
     *
     * @var integer Seconds
     */
    public $workerSleep = 0;

    /**
     * Number of max workers
     *
     * @var integer
     */
    public $workerMaxNum = 4;

    /**
     * Number of workers at start, less than or equal to $workerMaxNum
     *
     * @var integer
     */
    public $workerStartNum = 1;

    /**
     * Waiting time between worker started and next worker starting
     *
     * @var integer Seconds
     */
    public $workerWaitSeconds = 10;

    /**
     * Enable worker health check for listener
     *
     * @var boolean
     */
    public $workerHeathCheck = true;

    /**
     * Time interval of single processes frequency
     *
     * @var integer Seconds
     */
    public $singleSleep = 3;

    /**
     * Single process unique lock time for unexpected shutdown
     *
     * @var integer Seconds
     */
    public $singleLockTimeout = 15;

    /**
     * Descriptorspec for proc_open()
     *
     * @var array
     * @see http://php.net/manual/en/function.proc-open.php
     */
    protected static $_procDescriptorspec = [
        ["pipe", "r"],
        ["pipe", "w"],
        ["pipe", "w"],
        ];

    /**
     * Static listener object for injecting into customized callback process
     *
     * @var object
     */
    protected $_staticListen;

    /**
     * Static worker object for injecting into customized callback process
     *
     * @var object
     */
    protected $_staticWork;

    /**
     * Static single object for injecting into customized callback process
     *
     * @var object
     */
    protected $_staticSingle;

    /**
     * Worker process running stack
     *
     * @var array Worker ID => OS PID
     */
    protected $_pidStack = [];
    
    function __construct() 
    {
        // CLI only
        if (php_sapi_name() != "cli") {
            die('Access denied');
        }

        parent::__construct();

        // Init constructor hook
        if (method_exists($this, 'init')) {
            // You may need to set config to prevent any continuous growth usage 
            // such as `$this->db->save_queries = false;`
            return $this->init();
        }
    }

    /**
     * Action for activating a worker listener
     *
     * @return void
     */
    public function listen()
    {
        // Env check
        if (!$this->_isLinux()) {
            die("Error environment: Queue Listener requires Linux OS, you could use `work` or `single` instead.");
        }
        
        // Pre-work check
        if (!method_exists($this, 'handleListen'))
            throw new Exception("You need to declare `handleListen()` method in your worker controller.", 500);
        if (!method_exists($this, 'handleWork'))
            throw new Exception("You need to declare `handleWork()` method in your worker controller.", 500);
        if ($this->logPath && !file_exists($this->logPath)) {
            // Try to access or create log file
            if ($this->_log('')) {
                throw new Exception("Log file doesn't exist: `{$this->logPath}`.", 500);
            }
        }

        // INI setting
        if ($this->debug) {
            error_reporting(-1);
            ini_set('display_errors', 1);
        }
        set_time_limit(0);

        // Worker command builder
        // Be careful to avoid infinite loop by opening listener itself
        $workerAction = 'work';
        $route = $this->router->fetch_directory() . $this->router->fetch_class() . "/{$workerAction}";
        $workerCmd = "{$this->phpCommand} " . FCPATH . "index.php {$route}";

        // Static variables
        $startTime = 0;
        $workerCount = 0;
        $workingFlag = false;

        // Setting check
        $this->workerMaxNum = ($this->workerMaxNum >= 1) ? floor($this->workerMaxNum) : 1;
        $this->workerStartNum = ($this->workerStartNum <= $this->workerMaxNum) ? floor($this->workerStartNum) : $this->workerMaxNum;
        $this->workerWaitSeconds = ($this->workerWaitSeconds >= 1) ? $this->workerWaitSeconds : 10;

        while (true) {

            // Loading insurance
            sleep(0.1);
            
            // Call customized listener process, assigns works while catching true by callback return
        	$hasEvent = ($this->handleListen($this->_staticListen)) ? true : false;

            // Start works if exists
            if ($hasEvent) {  

                // First time to assign works
                if (!$workingFlag) {
                    $workingFlag = true;
                    $startTime = microtime(true);
                    $this->_log("Queue Listener - Job detect");
                    $this->_log("Queue Listener - Start dispatch");

                    if ($this->workerStartNum > 1) {
                        // Execute extra worker numbers
                        for ($i=1; $i < $this->workerStartNum ; $i++) { 
                            $workerCount ++;
                            $r = $this->_workerCmd($workerCmd, $workerCount);
                        }
                    }
                }

                // Max running worker numbers check, otherwise keeps dispatching more workers
                if ($this->workerMaxNum <= $workerCount) {

                    // Worker heath check
                    if ($this->workerHeathCheck) {
                        foreach ($this->_pidStack as $id => $pid) {
                            $isAlive = $this->_isPidAlive($pid);
                            if (!$isAlive) {
                                $this->_log("Queue Listener - Worker health check: Missing #{$id} (PID: {$pid})");
                                $r = $this->_workerCmd($workerCmd, $id);
                            }
                        }
                    }

                    sleep($this->workerWaitSeconds);
                    continue;
                }

                // Assign works
                $workerCount ++;
                // Create a worker
                $r = $this->_workerCmd($workerCmd, $workerCount);

                sleep($this->workerWaitSeconds);
                continue;
            }

            // The end of assignment (No more work), close the assignment
            if ($workingFlag) {
                $workingFlag = false;
                $workerCount = 0;
                // Clear worker stack
                $this->_pidStack = [];
                $costSeconds = number_format(microtime(true) - $startTime, 2, '.', '');
                $this->_log("Queue Listener - Job empty");
                $this->_log("Queue Listener - Stop dispatch, total cost: {$costSeconds}s");
            }
            
            // Idle
            if ($this->listenerSleep) {
                sleep($this->listenerSleep);
            }
        }
    }
    
    /**
     * Action for creating a worker 
     *
     * @param integer $id
     * @return void
     */
    public function work($id=1)
    {
        // Pre-work check
        if (!method_exists($this, 'handleWork'))
            throw new Exception("You need to declare `handleWork()` method in your worker controller.", 500);
        
        // INI setting
        if ($this->debug) {
            error_reporting(-1);
            ini_set('display_errors', 1);
        }
        set_time_limit(0);

        // Start worker
        $startTime = microtime(true);
        $pid = getmypid();
        // Print worker close
        $this->_print("Queue Worker - Create #{$id} (PID: {$pid})");

        // Call customized worker process, stops till catch false by callback return
        while ($this->handleWork($this->_staticWork)) {
            // Sleep if set
            if ($this->workerSleep) {
                sleep($this->workerSleep);
            }
            // Loading insurance
            sleep(0.1);
        }

        // Print worker close
        $costSeconds = number_format(microtime(true) - $startTime, 2, '.', '');
        $this->_print("Queue Worker - Close #{$id} (PID: {$pid}) | cost: {$costSeconds}s");

        return;
    }

    /**
     * Launcher for guaranteeing unique process
     * 
     * This launcher would launch specified process if there are no any other same process running
     * by launcher. Using this for launching `listen` could ensure there are always one listener 
     * running at the same time with repeated launch calling likes crontab, which could also ensure
     * listener process would never gone away.
     *
     * @param string $action
     * @return void
     */
    public function launch($action='listen')
    {
        // Env check
        if (!$this->_isLinux()) {
            die("Error environment: Queue Launcher requires Linux OS, you could use `work` or `single` instead.");
        }
        
        // Action check
        if (!in_array($action, ['listen', 'work'])) {
            die("Action: `{$action}` is invalid for Launcher.");
        }
        
        // Null so far
        $logPath = '/dev/null';
        
        // Action command builder
        $route = $this->router->fetch_directory() . $this->router->fetch_class() . "/{$action}";
        $cmd = "{$this->phpCommand} " . FCPATH . "index.php {$route}";

        // Check process exists
        $search = str_replace('/', '\/', $route);
        // $result = shell_exec("pgrep -f \"{$search}\""); // Lacks of display info
        // Find out the process by name
        $psCmd = "ps aux | grep \"{$search}\" | grep -v grep";
        $psInfoCmd = "ps aux | egrep \"PID|{$search}\" | grep -v grep";
        $exist = (shell_exec($psCmd)) ? true : false;

        if ($exist) {
            
            $psInfo = shell_exec($psInfoCmd);
            die("Skip: Same process `{$action}` is running: {$route}.\n------\n{$psInfo}");
        }

        // Launch by calling command
        $launchCmd = "{$cmd} > {$logPath} &";
        $result = shell_exec($launchCmd);
        $result = shell_exec($psCmd);
        $psInfo = shell_exec($psInfoCmd);
        echo "Success to launch process `{$action}`: {$route}.\nCalled command: {$launchCmd}\n------\n{$psInfo}";

        return;
    }

    /**
     * Action for activating a single listened worker
     * 
     * Single process ensures unique process running, which prevents the same  
     * 
     * The reason which this doesn't use process check method such as `ps`, `pgrep`, is that the
     * process ID or name are unrecognizable as unique for ensuring only one Single process is
     * running.
     *
     * @return void
     */
    public function single($force=false)
    {
        // Pre-work check
        if (!method_exists($this, 'handleSingle'))
            throw new Exception("You need to declare `handleSingle()` method in your worker controller.", 500);

        // Shared lock flag builder
        $lockFile = sys_get_temp_dir() 
            . "/yidas-codeiginiter-queue-worker_" 
            . str_replace('/', '_', $this->router->fetch_directory()) 
            . get_called_class()
            . '.lock';

        // Single check for process uniqueness
        if (!$force && file_exists($lockFile)) {

            $lockData = json_decode(file_get_contents($lockFile), true);
            // Check expires time
            if (isset($lockData['expires_at']) && time() <= $lockData['expires_at']) {
                die("Single is already running: {$lockFile}\n");
            }
        }

        // Start Single - Set identified lock
        // Close Single - Release identified lock
        register_shutdown_function(function() use ($lockFile) {
            @unlink($lockFile);
        });

        // Create lock file
        $this->_singleUpdateLock($lockFile);

        // Call customized worker process, stops till catch false by callback return
        while ($this->handleSingle($this->_staticSingle)) {

            // Sleep if set
            if ($this->singleSleep) {
                sleep($this->singleSleep);
            }

            // Refresh lock file
            $this->_singleUpdateLock($lockFile);
        }
    }

    /**
     * Set static listener object for callback function
     * 
     * This is a optional method with object injection instead of assigning and
     * accessing properties.
     *
     * @param object $object
     * @return self
     */
    protected function setStaticListen($object)
    {
        $this->_staticListen = $object;
        
        return $this;
    }

    /**
     * Set static worker object for callback function
     *  
     * This is a optional method with object injection instead of assigning and
     * accessing properties.
     *
     * @param object $object
     * @return self
     */
    protected function setStaticWork($object)
    {
        $this->_staticWork = $object;
        
        return $this;
    }

    /**
     * Set static single object for callback function
     *  
     * This is a optional method with object injection instead of assigning and
     * accessing properties.
     *
     * @param object $object
     * @return self
     */
    protected function setStaticSingle($object)
    {
        $this->_staticSingle = $object;
        
        return $this;
    }

    /**
     * Single process creates or extends lock file
     * 
     * Extended second bases on sleep time and lock expiration
     *
     * @param string $lockFile
     * @return void
     */
    public function _singleUpdateLock($lockFile)
    {
        $lockData = [
            'pid' => getmypid(),
            'expires_at' => time() + $this->singleSleep + $this->singleLockTimeout,
        ];

        return file_put_contents($lockFile, json_encode($lockData));
    }

    /**
     * Command for creating a worker
     *
     * @param string $workerCmd
     * @param integer $workerCount
     * @return string Command result
     */
    protected function _workerCmd($workerCmd, $workerCount)
    {
        // Shell command builder
        $cmd = "{$workerCmd}/{$workerCount}";
        $cmd = ($this->logPath) ? "{$cmd} >> {$this->logPath}" : $cmd;

        // Process handler
        $process = proc_open("{$cmd} &", self::$_procDescriptorspec, $pipe);
        // Find out worker command's PID
        $status = proc_get_status($process);
        $pid = $status['pid'] + 1;
        // Stack workers
        $this->_pidStack[$workerCount] = $pid;
        // Close
        proc_close($process);

        // Log
        $time = date("Y-m-d H:i:s");
        $this->_log("Queue Listener - Dispatch Worker #{$workerCount} (PID: {$pid})");

        return true;
    }

    /**
     * Log to file
     *
     * @param string $textLine
     * @param string Specified log file path
     * @return integer|boolean The number of bytes that were written to the file, or FALSE on failure.
     */
    protected function _log($textLine, $logPath=null)
    {
        // Return back to console also
        $this->_print($textLine);
        
        $logPath = ($logPath) ? $logPath : $this->logPath;

        if ($logPath)
            return file_put_contents($logPath, $this->_formatTextLine($textLine), FILE_APPEND);
        else
            return false;
    }

    /**
     * Print (echo)
     *
     * @param string $textLine
     * @return void
     */
    protected function _print($textLine)
    {
        echo $this->_formatTextLine($textLine);
    }

    /**
     * Format output text line
     *
     * @param string $textLine
     * @return void
     */
    protected function _formatTextLine($textLine)
    {
        return $textLine = date("Y-m-d H:i:s") . " - {$textLine}" . PHP_EOL;
    }

    /**
     * Check if PID is alive or not
     *
     * @param integer Process ID
     * @return boolean
     */
    protected function _isPidAlive($pid)
    {
        return ((function_exists('posix_getpgid') && posix_getpgid($pid)) || file_exists("/proc/{$pid}")) ? true : false;
    }

    /**
     * Check if OS is Linux 
     *
     * @return boolean
     */
    protected function _isLinux()
    {
        // Just make sure that it's not Windows
        return (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') ? false : true;
    }

    /**
     * Listener callback function for overriding
     *
     * @param object Listener object for optional
     * @return boolean Return true if has work
     */
    /*
    protected function handleListen($static)
    {
        // Override this method
        
        return false;
    }
    */

    /**
     * Worker callback function for overriding
     *
     * @param object Worker object for optional
     * @return boolean Return false to stop work
     */
    /*
    protected function handleWork($static)
    {
        // Override this method
        
        return false;
    }
    */

    /**
     * Single callback function for overriding
     *
     * @param object Single object for optional
     * @return boolean Return false to stop work
     */
    /*
    protected function handleSingle($static)
    {
        // Override this method
        
        return false;
    }
    */
}