Skip to main content

This script pipes data to each pipeline using a shared thread-safe queue, rather than start a new pipeline for each input object. This reduces overhead and allows scripts that have a begin/process/end block to run more efficiently.

## ==============================================================================
## Split Jobs PowerShell Script
## Run commands in multiple concurrent pipelines
## <http://www.jansveld.net/powershell/2008/12/split-job-093/>
##
## When you need to run a simple process or gather (WMI) data from many machines,
## you need a lot of patience. Or, you can divide and conquer using multiple
## PowerShell runspaces. There are many ingenious scripts available on the web
## that allow us to launch and manage background processes (even for PS v1). For
## my purposes, I found the necessary inspiration in a blog by Gaurhoth. His New-
## TaskPool script allows us to run multiple instances of a script block
## concurrently. Very cool!
##
## The following script is a little different in that it pipes the data to each
## pipeline using a shared thread-safe queue rather than start a new pipeline for
## each input object. This reduces overhead and allows scripts that have a
## begin/process/end block to run more efficiently.
##
## For instance, take this simple data gathering exercise:
##
##     Get-Content machines.txt | foreach {Get-WmiObject Win32_ComputerSystem -ComputerName $_} | Export-Csv ComputerInfo.csv
##
## If you have a few hundred machines, this can take forever (especially if some
## machines are offline). Now replace the foreach alias with the Split-Job
## function:
##
##     Get-Content machines.txt | Split-Job {Get-WmiObject Win32_ComputerSystem -ComputerName $_} | Export-Csv ComputerInfo.csv
##
## It will create 10 runspaces and run the WMI query concurrently, so this should
## be almost 10x faster. Even if one of the pipelines stalls, the others will
## keep going. If you already have some data gathering script that accepts
## pipeline input, you can just drop Split-Job in:
##
##     Get-Content machines.txt | Split-Job .\MachineReport.ps1 | Export-Csv MachineReport.csv
##
## It is important to note that the position of the script in the pipeline is
## important; the command preceding it should be quick, e.g. get objects from a
## text file, AD, SQL etc.
##
## This is a work in progress and I will post more about this in the following
## weeks. In the meantime, comments and suggestions are welcome!
##
##
## Here is an update to the Split-Job function. Based in part on some of the
## comments on the previous version, I made the following changes:
##
## The format for the scriptblock has changed; this was done to make it more
## straightforward to specify parameters for those commands/scripts that accept
## pipeline input. If you need a foreach (%) you will have to include that in the
## command line.
##
## Examples:
##
##     "Server1","Server2","Server3" | Split-Job { c:\test.ps1 -Force }
##
##     "Server1","Server2","Server3" | Split-Job { % {Get-WmiObject Win32_ComputerSystem -ComputerName $_}}
##
## You can now import your profile, variables and/or aliases into the runspaces.
## This is somewhat of an experiment; I am not convinced this is even a good idea.
## Please give me your feedback if you think this is useful. Each runspace will
## have its current directory ($PWD) set to that of the main runspace. There is
## also some error handling code to make the script more robust.
##
## Enjoy!
## Arnoud
## ==============================================================================

#requires -version 1.0
################################################################################
## Run commands in multiple concurrent pipelines
##   by Arnoud Jansveld - http://www.jansveld.net/powershell/2008/12/split-job-093/
## Version History
## 0.93   Improve error handling: errors originating in the Scriptblock now
##        have more meaningful output
##        Show additional info in the progress bar (thanks Stephen Mills)
##        Add SnapIn parameter: imports (registered) PowerShell snapins
##        Add Function parameter: imports functions
##        Add SplitJobRunSpace variable; allows scripts to test if they are
##        running in a runspace
##        Add seconds remaining to progress bar (experimental)
## 0.92   Add UseProfile switch: imports the PS profile
##        Add Variable parameter: imports variables
##        Add Alias parameter: imports aliases
##        Restart pipeline if it stops due to an error
##        Set the current path in each runspace to that of the calling process
## 0.91   Revert to v 0.8 input syntax for the script block
##        Add error handling for empty input queue
## 0.9    Add logic to distinguish between scriptblocks and cmdlets or scripts:
##        if a ScriptBlock is specified, a foreach {} wrapper is added
## 0.8    Adds a progress bar
## 0.7    Stop adding runspaces if the queue is already empty
## 0.6    First version. Inspired by Gaurhoth's New-TaskPool script
################################################################################

function Split-Job {
    param (
        $Scriptblock = $(throw 'You must specify a command or script block!'),
        [int]$MaxPipelines=10,
        [switch]$UseProfile,
        [string[]]$Variable,
        [string[]]$Function = @(),
        [string[]]$Alias = @(),
        [string[]]$SnapIn
    )

    function Init ($InputQueue){
        # Create the shared thread-safe queue and fill it with the input objects
        $Queue = [Collections.Queue]::Synchronized([Collections.Queue]@($InputQueue))
        $QueueLength = $Queue.Count
        # Do not create more runspaces than input objects
        if ($MaxPipelines -gt $QueueLength) {$MaxPipelines = $QueueLength}
        # Create the script to be run by each runspace
        $Script  = "Set-Location '$PWD'; "
        $Script += {
            $SplitJobQueue = $($Input)
            & {
                trap {continue}
                while ($SplitJobQueue.Count) {$SplitJobQueue.Dequeue()}
            } |
        }.ToString() + $Scriptblock

        # Create an array to keep track of the set of pipelines
        $Pipelines = New-Object System.Collections.ArrayList

        # Collect the functions and aliases to import
        $ImportItems = ($Function -replace '^','Function:') +
            ($Alias -replace '^','Alias:') |
            Get-Item | select PSPath, Definition
        $stopwatch = New-Object System.Diagnostics.Stopwatch
        $stopwatch.Start()
    }

    function Add-Pipeline {
        # This creates a new runspace and starts an asynchronous pipeline with our script.
        # It will automatically start processing objects from the shared queue.
        $Runspace = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace($Host)
        $Runspace.Open()
        $Runspace.SessionStateProxy.SetVariable('SplitJobRunSpace', $True)

        function CreatePipeline {
            param ($Data, $Scriptblock)
            $Pipeline = $Runspace.CreatePipeline($Scriptblock)
            if ($Data) {
                $Null = $Pipeline.Input.Write($Data, $True)
                $Pipeline.Input.Close()
            }
            $Null = $Pipeline.Invoke()
            $Pipeline.Dispose()
        }

        # Optionally import profile, variables, functions and aliases from the main runspace
        if ($UseProfile) {
            CreatePipeline -Script "`$PROFILE = '$PROFILE'; . `$PROFILE"
        }
        if ($Variable) {
            foreach ($var in (Get-Variable $Variable -Scope 2)) {
                trap {continue}
                $Runspace.SessionStateProxy.SetVariable($var.Name, $var.Value)
            }
        }
        if ($ImportItems) {
            CreatePipeline $ImportItems {
                foreach ($item in $Input) {New-Item -Path $item.PSPath -Value $item.Definition}
            }
        }
        if ($SnapIn) {
            CreatePipeline (Get-PSSnapin $Snapin -Registered) {$Input | Add-PSSnapin}
        }
        $Pipeline = $Runspace.CreatePipeline($Script)
        $Null = $Pipeline.Input.Write($Queue)
        $Pipeline.Input.Close()
        $Pipeline.InvokeAsync()
        $Null = $Pipelines.Add($Pipeline)
    }

    function Remove-Pipeline ($Pipeline) {
        # Remove a pipeline and runspace when it is done
        $Pipeline.RunSpace.Close()
        $Pipeline.Dispose()
        $Pipelines.Remove($Pipeline)
    }

    # Main
    # Initialize the queue from the pipeline
    . Init $Input
    # Start the pipelines
    while ($Pipelines.Count -lt $MaxPipelines -and $Queue.Count) {Add-Pipeline}

    # Loop through the runspaces and pass their output to the main pipeline
    while ($Pipelines.Count) {
        # Only update the progress bar once a second
        if (($stopwatch.ElapsedMilliseconds - $LastUpdate) -gt 1000) {
            $Completed = $QueueLength - $Queue.Count - $Pipelines.count
            $LastUpdate = $stopwatch.ElapsedMilliseconds
            $SecondsRemaining = $(if ($Completed) {
                (($Queue.Count + $Pipelines.Count)*$LastUpdate/1000/$Completed)
            } else {-1})
            Write-Progress 'Split-Job' ("Queues: $($Pipelines.Count)  Total: $($QueueLength)  " +
            "Completed: $Completed  Pending: $($Queue.Count)")  `
            -PercentComplete ([Math]::Max((100-[Int]($Queue.Count+$Pipelines.Count)/$QueueLength*100),0)) `
            -CurrentOperation "Next item: $(trap {continue}; if ($Queue.Count) {$Queue.Peek()})" `
            -SecondsRemaining $SecondsRemaining
        }
        foreach ($Pipeline in @($Pipelines)) {
            if ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline ) {
                $Pipeline.Output.NonBlockingRead()
                $Pipeline.Error.NonBlockingRead() | Out-Default
            } else {
                # Pipeline has stopped; if there was an error show info and restart it
                if ($Pipeline.PipelineStateInfo.State -eq 'Failed') {
                    $Pipeline.PipelineStateInfo.Reason.ErrorRecord |
                        Add-Member NoteProperty writeErrorStream $True -PassThru |
                            Out-Default
                    # Restart the runspace
                    if ($Queue.Count -lt $QueueLength) {Add-Pipeline}
                }
                Remove-Pipeline $Pipeline
            }
        }
        Start-Sleep -Milliseconds 100
    }
}