ColdBox Promises, Executors, Async programming and Parallel Computations
ColdBox 6 introduces the concept of asynchronous and parallel programming using Futures and Executors for ColdFusion (CFML). We leverage the entire arsenal in the JDK to bring you a wide array of features for your applications. From the ability to create asynchronous pipelines, to parallel work loads, work queues, and scheduled tasks.
YOU DON'T NEED COLDBOX TO RUN ANY SCHEDULED TASKS OR ANY FEATURES OF THE ASYNC PACKAGE. YOU CAN USE ANY OF THE STANDALONE LIBRARIES BY USING CACHEBOX, WIREBOX OR LOGBOX STANDALONE.
Our async package coldbox.system.async
is also available for all the standalone libraries: WireBox, CacheBox, and LogBox. This means that you can use the async capabilities in ANY ColdFusion (CFML) application, not only ColdBox HMVC applications.
We leverage Java Executors
, CompletableFutures
and much more classes from the concurrent packages in the JDK: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/package-summary.html
We have created a full sample gallery that we use in our live sessions and trainings. It contains tons of samples you can run and learn from: https://github.com/lmajano/to-the-future-with-cbFutures
We have created a manager for leveraging all the async/parallel capabilities. We lovingly call it the ColdBox AsyncManager
. From this manager you will be able to create async pipelines, simple futures, executors and much more.
A ColdBox future is used for async/parallel programming where you can register a task or multiple tasks that will execute in a non-blocking approach and trigger dependent computations which could also be asynchronous. This Future object can then be used to monitor the execution of the task and create rich completion/combining pipelines upon the results of such tasks. You can still use a get()
blocking operation, but that is an over simplistic approach to async programming because you are ultimately blocking to get the result.
ColdBox futures are backed by Java's CompletableFuture
API, so the majority of things will apply as well; even Java developers will feel at home. It will allow you to create rich pipelines for creating multiple Futures, chaining, composing and combining results.
You might be asking yourself, why should I leverage ColdBox futures instead of traditional cfthreads
or even the CFML engine's runAsync()
. Let's start with the first issue, using ColdBox futures instead of cfthread
.
cfthread
vs ColdBox Futurescfthreads
are an oversimplification approach to async computations. It allows you to spawn a thread backed by a Java Runnable and then either wait or not for it to complete. You then must use the thread
scope or other scopes to move data around, share data, and well it can get out of hand very quickly. Here are some issues:
Too over-simplistic
Threads limited on creation
Cannot be completed manually
No concept of a completion stage pipeline
No control of what executor runs the task
No way to trap the exceptions and recover
No way to do parallel computations with futures
No way to get a result from the computation, except by using a shared scope
You must track, name and pull information from the threads
etc.
You get the picture. They exist, but they are not easy to deal with and the API for managing them is poor.
runAsync()
vs ColdBox FuturesColdFusion 2018 and Lucee 5 both have introduced the concept of async programming via their runAsync()
function. Lucee also has the concept of executing collections in parallel via the each(), map(), filter()
operations as well. However, there is much to be desired in their implementations. Here are a list of deficiencies of their current implementations:
Backed by a custom wrapper to java.util.concurrent.Future
and not Completable Futures
Simplistic error handler with no way to recover or continue executing pipelines after an exception
No way to choose or reuse the executor to run the initial task in
No way to choose or reuse the executor to run the sub-sequent then()
operations. Lucee actually creates a new singleThreadExecutor()
for EVERY then()
operation.
No way to operate on multiple futures at once
No way to have one future win against multiple future operations
No way to combine futures
No way to compose futures
No ability to schedule tasks
No ability to run period tasks
No ability to delay the execution of tasks
Only works with closures, does not work on actually calling component methods
And so much more
All of our futures execute in the server's common ForkJoin
pool the JDK provides. However, the JDK since version 8 provides you a framework for simplifying the execution of asynchronous tasks. It can automatically provide you with a pool of threads and a simple API for assigning tasks or work loads to them. We have bridged the gap between Java and ColdFusion and now allow you to leverage all the functionality of the framework in your applications. You can create many types of executors and customized thread pools, so your work loads can use them.
Some resources:
The manager will be registered in WireBox as AsyncManager@ColdBox
or can be retrieved from the ColdBox main controller: controller.getAsyncManager()
.
The super type has a new async()
method that returns to you the instance of the AsyncManager
so you can execute async/parallel operations as well.
To The Future with ColdBox Futures
You will be able to create async pipelines and futures by using the following AsyncManager
creation methods:
init( value, executor, debug, loadAppContext )
: Construct a new future. The value
argument can be the future closure/udf. You can also pass in a custom executor and some utility flags.
newFuture( [task], [executor] ):Future
: Returns a ColdBox Future. You can pass an optional task (closure/udf) and even an optional executor.
newCompletedFuture( value ):Future
: Returns a new future that is already completed with the given value.
Please note that some of the methods above will return a ColdBox Future object that is backed by Java's CompletableFuture
()
Here are the method signatures for the methods above:
There are two ways to start async computations with futures:
Via the newFuture()
constructor
Via the run()
method
The constructor is the shortcut approach and only allows for closures to be defined as the task. The run()
methods allows you to pass a CFC instance and a method
name, which will then call that method as the initial computation.
Here are the run()
method signatures:
Please note that the majority of methods take in an executor
that you can supply. This means that you can decide in which thread pool the task will execute in, or by default it will run in the ForkJoinPool
or the same thread the computation started from.
WARNING: Once you pass a closure/udf or cfc/method to the run()
methods or the constructor, the JDK will create and send the task for execution to the appropriate executor. You do not need to start the thread or issue a start command. It is implied.
The loadAppContext
is a boolean flag that allows you to load the ColdFusion (CFML) application context into the running threads. By default, this is needed if your threads will require certain things from the application context: mappings, app settings, etc. However, some times this can cause issues and slowdowns, so be careful when selecting when to load the context or not. As a rule of thumb, we would say to NOT load it, if you are doing pure computations or not requiring mappings or app settings.
There are also times where you need a future to be completed immediately. For this you can build a future already completed via the newCompletedFuture()
or by leveraging the complete()
function in the Future object. This can also allow you to complete the future with an initial value if you want to.
Completed futures are great for mocking and testing scenarios
There are many many methods in the Java JDK that are implemented in the ColdBox Futures. We have also added several new methods that play very nicely with our dynamic language. Here is a collection of the currently implemented methods.
The ColdBox AsyncManager will allow you to register and manage different types of executors that can execute your very own tasks! Each executor acts as a singleton and can be configured uniquely. (See: )
You can also create executors on the fly so your async futures can use them as well for ONLY that execution stage.
Nice video explaining the Java Executor Service:
The types that we currently support are:
fixed
: By default it will build one with 20 threads on it. Great for multiple task execution and worker processing.
single
: A great way to control that submitted tasks will execute in the order of submission much like a FIFO queue (First In First Out).
cached
: An unbounded pool where the number of threads will grow according to the tasks it needs to service. The threads are killed by a default 60 second timeout if not used and the pool shrinks back to 0.
scheduled
: A pool to use for scheduled tasks that can run one time or periodically. The default scheduled task queue has 20 threads for processing.
Here are the methods you can use for registering and managing singleton executors in your application:
newExecutor()
: Create and register an executor according to passed arguments. If the executor exists, just return it.
newScheduledExecutor()
: Create a scheduled executor according to passed arguments. Shortcut to newExecutor( type: "scheduled" )
newSingleExecutor()
: Create a single thread executor. Shortcut to newExecutor( type: "single", threads: 1 )
newCachedExecutor()
: Create a cached thread executor. Shortcut to newExecutor( type: "cached" )
getExecutor()
: Get a registered executor registerd in this async manager
getExecutorNames()
: Get the array of registered executors in the system
hasExecutor()
: Verify if an executor exists
deleteExecutor()
: Delete an executor from the registry, if the executor has not shutdown, it will shutdown the executor for you using the shutdownNow() event
shutdownExecutor()
: Shutdown an executor or force it to shutdown, you can also do this from the Executor themselves. If an un-registered executor name is passed, it will ignore it
shutdownAllExecutors()
: Shutdown all registered executors in the system
getExecutorStatusMap()
: Returns a structure of status maps for every registered executor in the manager. This is composed of tons of stats about the executor.
And here are the full signatures:
If you just want to use executors for different completion stages and then discard them, you can easily do so by using the $executors
public component in the AsyncManager
. This object can be found at: coldbox.system.async.util.Executors
:
The available creation methods are:
This way you can use them and discard them upon further processing.
Every ColdBox application will register a task scheduler called coldbox-tasks
which ColdBox internal services can leverage it for tasks and schedules. It is configured with 20 threads and using the scheduled
type. You can leverage if you wanted to for your own tasks as well.
We have also added a new configuration struct called executors
which you can use to register global executors or per-module executors.
Each executor registration is done as a struct with the name of the executor and at most two settings:
type
: The executor type you want to register
threads
: The number of threads to assign to the executor
You can also do the same at a per-module level in your module's ModuleConfig.cfc
.
ColdBox will register your executors upon startup and un-register and shut them down when the module is unloaded.
We have extended WireBox so you can inject registered executors using the following DSL: executors:{name}
For example, the ColdBox file logger uses futures and has no app context loaded, since it does not require it. It only monitors a log queue and streams the content to a file. Remember, cohesion and encapsulation. The purerer your computations are, the better and safer they will be ()
Always checkout the API docs for the latest methods and signatures:
Please note that each executor is unique in its way of operation, so make sure you read about each type in the JavaDocs or watch this amazing video:
Method | Returns | Description |
all() | Future | Allows for the parallel execution of many closures/futures or an array of closures/futures. |
allApply() | Collection | Allows you to apply a function to every element of a collection: array or struct and then reconstructing the collection according to your changes. A parallel map() |
anyOf() | Future | Allows for the parallel/execution of many closures/futures or an array of closures/futures, but the only returning the FASTEST future that completes. The race is on! |
cancel() | Boolean | If not already completed, completes this Future with a |
complete() | Boolean | If not already completed, sets the value returned by get() and related methods to the given value. |
completedFuture() | Future | Returns a new ColdBox Future that is already completed with the given value. |
completeExceptionally() | Future | If not already completed, causes invocations of get() and related methods to throw the given exception. The exception type is of |
exceptionally() onException() | Future | Register an event handler for any exceptions that happen before it is registered in the future pipeline. Whatever this function returns, will be used for the next registered functions in the pipeline. |
get() | any | Waits if necessary for at most the given time for this future to complete, and then returns its result, if available. |
getNative() | Java CompletableFuture | Get the native Java CompletableFuture |
getNow() | any | Returns the result value (or throws any encountered exception) if completed, else returns the given defaultValue. |
isCancelled() | Boolean | Flag that checks if the computation has been cancelled |
isCompletedExceptionally() | Boolean | Flag that checks if the computation threw an exception |
isDone() | Boolean | Flag that checks if the computation has finished |
run() runAsync() supplyAsync() | Future | Executes a runnable closure or component method via Java's CompletableFuture and gives you back a ColdBox Future: |
then() thenApply() | Future | Executed once the computation has finalized and a result is passed in to the target |
thenAsync() thenApplyAsync() | Future | Executed once the computation has finalized and a result is passed in to the target but this will execute in a separate thread. By default it uses the |
thenCombine() | Future | This used when you want two Futures to run independently and do something after both are complete. |
thenCompose() | Future | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage as the argument to the supplied function. |
withTimeout() | Future | Ability to attach a timeout to the execution of the allApply() method |
Here are some of the methods that will allow you to do parallel computations. Please note that the asyncManager()
has shortcuts to these methods, but we always recommend using them via a new future, because then you can have further constructor options like: custom executor, debugging, loading CFML context and much more.
all( a1, a2, ... ):Future
: This method accepts an infinite amount of future objects, closures or an array of closures/futures in order to execute them in parallel. It will return a future that when you call get()
on it, it will retrieve an array of the results of all the operations.
allApply( items, fn, executor ):array
: This function can accept an array of items or a struct of items of any type and apply a function to each of the item's in parallel. The fn
argument receives the appropriate item and must return a result. Consider this a parallel map()
operation.
anyOf( a1, a2, ... ):Future
: This method accepts an infinite amount of future objects, closures or an array of closures/futures and will execute them in parallel. However, instead of returning all of the results in an array like all()
, this method will return the future that executes the fastest! Race Baby!
withTimeout( timeout, timeUnit )
: Apply a timeout to all()
or allApply()
operations. The timeUnit
can be: days, hours, microseconds, milliseconds, minutes, nanoseconds, and seconds. The default is milliseconds.
Please note that some of the methods above will return a ColdBox Future object that is backed by Java's CompletableFuture (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html)
Here are the method signatures for the methods above, which you can call from the asyncManager
or a newly created future.
Here are some examples:
Please also note that you can choose your own Executor for the parallel computations by passing the executor
via the newFuture()
method.
The async package is what powers scheduled tasks and it can be available to any CFML application by using any of our standalone libraries and frameworks:
YOU DON'T NEED COLDBOX TO RUN ANY SCHEDULED TASKS OR ANY FEATURES OF THE ASYNC PACKAGE. YOU CAN USE ANY OF THE STANDALONE LIBRARIES ABOVE.
However, if you use ColdBox, you get enhanced features and new functionality. For example, the ColdBox Scheduled Tasks are an enhanced implementation of the core scheduled tasks we will be reviewing in this document.
The async package offers you the ability to schedule tasks and workloads via the Scheduled Executors that you can register in the async manager. We also provide you with a lovely Scheduler
class that can keep track of all the tasks you would like to be executing in a ScheduledExecutor
. In essence, you have two options when scheduling tasks:
Scheduler Approach: Create a scheduler and register tasks in it
Scheduled Executor Approach: Create a ScheduledExecutor
and send task objects into it
With our scheduled tasks you can run either one-off tasks or periodically tasks.
To create a new scheduler you can call the Async Managers' newScheduler( name )
method. This will create a new coldbox.system.async.tasks.Scheduler
object with the specified name you pass. It will also create a ScheduledExecutor
for you with the default threads count inside the scheduler.. It will be then your responsibility to persist that scheduler so you can use it throughout your application process.
Once you get an instance to that scheduler you can begin to register tasks on it. Once all tasks have been registered you can use the startup()
method to startup the tasks and the shutdown()
method to shutdown all tasks and the linked executor.
The name of the ScheduledExecutor
will be {schedulerName}-scheduler
The following methods are used to impact the operation of all scheduled tasks managed by the scheduler:
By default, all tasks run under the system default timezone which usually is UTC. However, if you would like to change to a different execution timezone, then you can use the setTimeZone()
method and pass in a valid timezone string:
You can find all valid time zone Id's here: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/ZoneId.html
Remember that some timezones utilize daylight savings time. When daylight saving time changes occur, your scheduled task may run twice or even not run at all. For this reason, we recommend avoiding timezone scheduling when possible.
By default the scheduler will register a scheduled
executor with a default of 20 threads for you with a name of {schedulerName}-scheduler.
If you want to add in your own executor as per your configurations, then just call the setExecutor()
method.
You can find how to work with executors in our executors section.
Every scheduler has the following properties available to you in the variables
scope
Every scheduler has several utility methods:
Ok, now that we have seen all the capabilities of the scheduler, let's dive deep into scheduling tasks with the task( name )
method.
Once you call on this method, the scheduler will create a ColdBoxScheduledTask
object for you, configure it, wire it, register it and return it to you.
You can find the API Docs for this object here: https://s3.amazonaws.com/apidocs.ortussolutions.com/coldbox/6.4.0/coldbox/system/async/tasks/ScheduledTask.html
You register the callable event via the call()
method on the task object. You can register a closure/lambda or a invokable CFC. If you register an object, then we will call on the object's run()
method by default, but you can change it using the method
argument and call any public/remote method.
There are many many frequency methods in scheduled tasks that will enable the tasks in specific intervals. Every time you see that an argument receives a timeUnit
the available options are:
days
hours
minutes
seconds
milliseconds (default)
microseconds
nanoseconds
Ok, let's go over the frequency methods:
All time
arguments are defaulted to midnight (00:00)
By default all tasks that have interval rates/periods that will execute on that interval schedule. However, what happens if a task takes longer to execute than the period? Well, by default the task will execute even if the previous one has not executed. If you want to prevent this behavior, then you can use the withNoOverlaps()
method and ColdBox will register the tasks with a fixed delay. Meaning the intervals do not start counting until the last task has finished executing.
Spaced delays are a feature of the Scheduled Executors. There is even a spacedDelay( delay, timeUnit )
method in the Task object.
Every task can also have an initial delay of first execution by using the delay()
method.
The delay
is numeric and the timeUnit
can be:
days
hours
minutes
seconds
milliseconds (default)
microseconds
nanoseconds
Please note that the delay
pushes the execution of the task into the future only for the first execution.
A part from registering tasks that have specific intervals/frequencies you can also register tasks that can be executed ONCE ONLY. These are great for warming up caches, registering yourself with control planes, setting up initial data collections and so much more.
Basically, you don't register a frequency just the callable event. Usually, you can also combine them with a delay of execution, if you need them to fire off after certain amount of time has passed.
We already saw that a scheduler has life-cycle methods, but a task can also have several useful life-cycle methods:
By default, all tasks will ask the scheduler for the timezone to run in. However, you can override it on a task-by-task basis using the setTimezone( timezone )
method:
You can find all valid time zone Id's here: https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/ZoneId.html
Remember that some timezones utilize daylight savings time. When daylight saving time changes occur, your scheduled task may run twice or even not run at all. For this reason, we recommend avoiding timezone scheduling when possible.
There are many ways to constrain the execution of a task. However, you can register a when()
closure that will be executed at runtime and boolean evaluated. If true
, then the task can run, else it is disabled.
Every task is runnable from registration according to the frequency you set. However, you can manually disable a task using the disable()
method:
Once you are ready to enable the task, you can use the enable()
method:
All tasks keep track of themselves and have lovely metrics. You can use the getStats()
method to get a a snapshot structure
of the stats in time. Here is what you get in the stats structure:
We have created some useful methods that you can use when working with asynchronous tasks:
Let's investigate now a second approach to task scheduling. We have seen the Scheduler
approach which is a self-contained object that can track multiple tasks for you and give you enhanced and fluent approaches to scheduling. However, there are times, where you just want to use a ScheduledExecutor
to send tasks into for either one-time executions, or also on specific frequencies and skip the Scheduler.
Like with anything in life, there are pros and cons. The Scheduler approach will track all the scheduled future results of each task so you can see their progress, metrics and even cancel them. With this approach, it is more of a set off and forget approach.
Let's get down to business. The first step is to talk to the AsyncManager and register a scheduled executor. You can do this using two methods:
newExecutor( name, type, threads )
- Pass by type
newScheduledExecutor( name, threads )
- Shorthand
Once you register the executor the Async Manager will track it's persistence and then you can request it's usage anywhere in your app via the getExecutor( name )
method or inject it using the executors
injection DSL.
Now that we have a scheduler, we can use the newTask()
method to get a ScheduledTask
, configure it, and send it for execution.
As you can see, now we are in Scheduling Tasks mode, and all the docs on it apply. Several things are different in this approach:
We talk to the executor via the newTask()
method to get a new ScheduledTask
object
We call the start()
method manually, whenever we want to send the task into scheduling
We get a ScheduledFuture
result object so we can track the results of the schedule.
You can very easily create working queues in this approach by being able to send one-off tasks into the executors and forget about them. Let's say we have an app that needs to do some image processing afte ran image has been uploaded. We don't want to hold up (block) the calling thread with it, we upload, send the task for processing and return back their identifier for the operation.
Remember you can set how many threads you want in a executor. It doesn't even have to be a scheduled executor, but could be a cached one which can expand and contract according to work loads.
Method
Description
setTimezone( timezone )
Set the timezone to use for all registered tasks
setExecutor( executor )
Override the executor generated for the scheduler
Object
Description
asyncManager
Async manager reference
executor
Scheduled executor
started
A boolean flag indicating if the scheduler has started or not
tasks
The collection of registered tasks
timezone
Java based timezone object
util
ColdBox utility
Method
Description
getRegisteredTasks()
Get an ordered array of all the tasks registered in the scheduler
getTaskRecord( name )
Get the task record structure by name:
{
name,
task,
future,
scheduledAt,
registeredAt,
error,
errorMessage,
stacktrace
}
getTaskStats()
Builds out a struct report for all the registered tasks in this scheduler
hasTask( name )
Check if a scheduler has a task registered by name
hasStarted()
Has the scheduler started already
removeTask( name )
Cancel a task and remove it from the scheduler
startup()
Startup the scheduler. This is called by ColdBox for you. No need to call it.
shutdown()
Shutdown the scheduler
task( name )
Register a new task and return back to you the task so you can build it out.
Frequency Method
Description
every( period, timeunit )
Run the task every custom period of execution
spacedDelay( spacedDelay, timeunit )
Run the task every custom period of execution but with NO overlaps
everyMinute()
Run the task every minute from the time it get's scheduled
everyHour()
Run the task every hour from the time it get's scheduled
everyHourAt( minutes )
Set the period to be hourly at a specific minute mark and 00 seconds
everyDay()
Run the task every day at midnight
everyDayAt( time )
Run the task daily with a specific time in 24 hour format: HH:mm
everyWeek()
Run the task every Sunday at midnight
everyWeekOn( day, time )
Run the task weekly on the given day of the week and time
everyMonth()
Run the task on the first day of every month at midnight
everyMonthOn( day, time )
Run the task every month on a specific day and time
onFirstBusinessDayOfTheMonth( time )
Run the task on the first Monday of every month
onLastBusinessDayOfTheMonth( time )
Run the task on the last business day of the month
everyYear()
Run the task on the first day of the year at midnight
everyYearOn( month, day, time )
Set the period to be weekly at a specific time at a specific day of the week
onWeekends( time )
Run the task on Saturday and Sunday
onWeekdays( time )
Run the task only on weekdays at a specific time.
onMondays( time )
Only on Mondays
onTuesdays( time )
Only on Tuesdays
onWednesdays( time )
Only on Wednesdays
onThursdays( time )
Only on Thursdays
onFridays( time )
Only on Fridays
onSaturdays( time )
Only on Saturdays
onSundays( time )
Only on Sundays
Method
Description
after( target )
Store the closure to execute after the task executes
function( task, results )
before( target )
Store the closure to execute before the task executes
function( task )
onFailure( target )
Store the closure to execute if there is a failure running the task
function( task, exception )
onSuccess( target )
Store the closure to execute if the task completes successfully
function( task, results )
Metric
Description
created
The timestamp of when the task was created in memory
inetHost
The hostname of the machine this task is registered with
lastRun
The last time the task ran
lastResult
The last result the task callable produced
localIp
The ip address of the server this task is registered with
neverRun
A boolean flag indicating if the task has NEVER been ran
nextRun
When the task will run next
totalFailures
How many times the task has failed execution
totalRuns
How many times the task has run
totalSuccess
How many times the task has run and succeeded
Method
Description
err( var )
Send output to the error stream
hasScheduler()
Verifies if the task is assigned a scheduler or not
isDisabled()
Verifies if the task has been disabled by bit
isConstrained()
Verifies if the task has been constrained to run by weekends, weekdays, dayOfWeek, or dayOfMonth
out( var )
Send output to the output stream
start()
This kicks off the task into the scheduled executor manually. This method is called for you by the scheduler upon application startup or module loading.