Dashboard > GridGain User Guide > Table Of Contents > Developers Guide > Grid Tasks And Grid Jobs
Grid Tasks And Grid Jobs
Added by architect, last edited by morpheus on Dec 23, 2008  (view change)
Labels: 
(None)


GridTask And GridJob Interfaces

To create a grid task you need to implement GridTask Javadoc interface. When implementing this interface you will also need to be aware of GridJob Javadoc interface. Basically, both of these interfaces define practically everything you need to know to create a grid task. In a nutshell, GridTask is responsible for splitting business logic into multiple grid jobs, receiving results from individual grid jobs executing on remote nodes, and reducing (aggregating) received jobs' results into final grid task result.

Grid task gets split into jobs when GridTask.map(List, Object) Javadoc method is called. This method returns all jobs for the task mapped to their corresponding grid nodes for execution. Grid will then serialize this jobs and send them to requested nodes for execution.

See GridTask Javadoc and GridJob Javadoc Javadoc documentation for more information about their API.

Executing Grid Tasks

Grid-enabling is a process of making a piece of Java code to execute on the grid. In GridGain, there are two ways to do grid-enabling:

Direct Execution vs. Annotation-Based AOP

There is no better or worse between these two methods. They both have their areas of applicability. When creating grid task you basically have the same programming and development model as in JEE: you create a component, deploy it and execute it. With annotation-based grid-enabling you have an extra option of transparently attaching grid-enabling logic to existing code without modifying it (except for additional annotation).

API-Based Grid Task Execution

This method allows to grid-enable any arbitrary Java code. You have a full control on split and aggregate logic and all other aspects of grid task execution. Here is an example of direct grid task execution:

public static void main(String[] args) throws GridException {
    GridFactory.start();

    try {
        Grid grid = GridFactory.getGrid();

        // Execute task.
        GridTaskFuture<String> future = grid.execute(FooBarTask.class, "Argument");

        // Wait for task completion.
        String result = fugure.get();

        // Print out task result.
        System.out.println("Task result: " + result);
    }
    finally {
        GridFactory.stop(true);
    }
}

Refer to HelloWorld - Basic Grid Task for more coding examples.

Annotate Existing Method With Gridify Annotation

The only difference of this method vs. directly executing grid task is that you can annotate a regular Java method and it will become grid-enabled. Using this technique you can still have custom grid task that will handle annotation-based grid-enabling (including split & aggregate logic or passing state to remote jobs) but you will be limited to the boundaries of the method you are grid-enabling. Here is an example of such usage:

@Gridify(taskClass = FooBarTask.class, timeout = 3000)
public void sayIt(String arg) {
    // Some business logic.
}

Refer to HelloWorld - Gridify With Custom Task for more coding examples.

For information on how to configure AOP, refer to AOP Configuration section.

Serializable State

Note that when using @Gridify Javadoc annotation on non-static methods without specifying explicit grid task, the state of the whole instance will be serialized and sent out to remote node. Therefore the class must implement java.io.Serializable interface. If you cannot make the class Serializable, then you must implement custom grid task which will take care of proper state initialization (see HelloWorld - Gridify With State example). In either case, GridGain must be able to serialize the state passed to remote node.

Configuring Grid Tasks

Starting with GridGain 2.1 you can start multiple instances of GridTopologySpi, GridLoadBalancingSpi, GridFailoverSpi, and GridCheckpointSpi. If you do that, you need to tell a task which SPI to use (by default it will use the fist SPI in the list).

Add @GridTaskSpis Javadoc annotation to your task to specify which SPIs it wants to use. If this annotation is omitted, then by default GridGain will pick the first corresponding SPI implementation from the array provided in configuration.

For more information and examples refer to Specifying Different SPIs Per GridTask documentation.

Grid Task Execution Sequence

The sequence of task execution can be described as following:

  1. Upon request to execute a grid task with given task name system will find deployed task with given name.
  2. System will create new Distributed Grid Task Session. Also see GridTaskSession Javadoc .
  3. System will inject all annotated resources (including distributed task session) into grid task instance. See Resource Injection for more information.
  4. System will call method map(...) on GridTask Javadoc interface. These method is basically responsible for splitting business logic of grid task into multiple grid jobs (units of execution) and mapping them to grid nodes. Method map(...) returns a map of grid jobs keyed by the grid nodes. Consider using @GridLoadBalancerResource Javadoc to inject load balancer into task for assigning jobs to the best available nodes.
  5. System will start sending grid jobs to their respective nodes.
  6. Upon arrival to remote node, grid job gets put on waiting list which is passed to underlying GridCollisionSpi Javadoc SPI.
  7. The Collision SPI on remote node will decide one of the following scheduling policies:
    Policy Description
    WAIT Grid Job will be kept on waiting list. In this case, job will not get a chance to execute until next time the Collision SPI is called. Collision SPI gets called every time a new job arrives or an active one completes.
    EXECUTE Grid Job will be moved to active list (i.e. activated). In this case system will proceed with job execution.
    REJECT Job on the waiting list can be rejected before they get a chance to start executing. In this case the GridJobResult Javadoc passed into GridTask.result(GridJobResult, List) Javadoc method will contain GridExecutionRejectedException Javadoc exception. If you are using any of the Grid Task Adapters shipped with GridGain, then job will be failed over automatically for execution on another node.
    CANCEL If GridJob is on the active list and is currently executing, then it can be canceled by calling GridJob.cancel() Javadoc method. Note that in this case job will still complete and return a result from GridJob.execute() Javadoc method.
  8. For activated jobs on remote nodes, system will inject all annotated resources (including distributed task session) into grid job instance. See Resource Injection for more information.
  9. Remote nodes will execute the jobs by calling GridJob.execute() Javadoc method.
  10. If job gets canceled while executing on remote node, then GridJob.cancel() Javadoc method will be called. Note that just like with Thread.interrupt() method, grid job cancellation serves as a hint that a job should stop executing or exhibit some other user defined behavior. Generally it is up to a job to decide whether it wants to react to cancellation or ignore it. Job cancellation can happen for several reasons:
    • Collision SPI has canceled an active job.
    • Parent task has completed without waiting for this job's result.
    • User canceled task by calling GridTaskFuture.cancel() Javadoc method.
  11. Once job execution is complete, the return value will be sent back to parent task and will be passed into GridTask.result(GridJobResult, List) Javadoc method. If job execution resulted in a checked exception, then GridJobResult.getException() Javadoc method will contain that exception. If job execution threw a runtime exception or error, then it will be wrapped into GridUserUndeclaredException Javadoc exception.
  12. Method GridTask.result(GridJobResult, List) Javadoc is called for each job result and decides whether or not to continue waiting for the remaining results, failover current result or reduce immediately based on returned policy.
    Policy Description
    GridJobResultPolicy.WAIT Javadoc If this policy is returned, then Grid Task will continue to wait for other job results. If this result is the last job result, then GridTask.reduce(List) Javadoc method will be called.
    GridJobResultPolicy.REDUCE Javadoc If this policy is returned, then method GridTask.reduce(List) Javadoc will be called right away without waiting for other jobs' completion (all remaining jobs will receive a cancel request).
    GridJobResultPolicy.FAILOVER Javadoc If this policy is returned, then job will be failed over to another node for execution. The node to which job will get failed over to is decided by GridFailoverSpi Javadoc SPI implementation. Note that if you use any of Grid Task Adapters then they will automatically fail-over jobs to ther nodes for 2 known failure cases: node crash and job rejection.
  13. When enough results are received, method GridTask.reduce(List) Javadoc is called to aggregate (reduce) these results into one final grid task result.
  14. After reduce(...) is complete - the result is returned to user as grid task result and can be retrieved from GridTaskFuture.get() Javadoc method.
  15. System will clean up all task session resources (such as checkpoints with session scope). Execution of the grid task is considered finished at this point.

Grid Task Coding Guidelines

There are certain known patterns and anti-patterns to be aware of when developing grid task and jobs.

Serialization and Deserialization

Jobs created by task are moved from one grid node to another (see Grid Tasks And Grid Jobs). Before sending they are serialized into the byte stream and thus need to implement java.io.Serializable interface. On remote node every job is deserialized with a class loader that depends on deployment method (see Grid Deployment).

Prior to GridGain 2.1, every grid job class member (including super classes) except for static members need to implement java.io.Serializable. Static class members will not be sent to remote node and should be initialized on remote node. Note also that task parameters passed into GridJob.execute() Javadoc method are sent to remote nodes and need to implement java.io.Serializable as well.

Starting with GridGain 2.1, you can configure different Grid Marshallers and depending on a marshaller, serialization may either be required or not.

Inner and Anonymous Classes

Any kind of inner classes or anonymous classes are allowed. Write your code as you usually do and GridGain will distribute it. You can implement your job as anonymous class within grid task class and use task class members inside your job. Here is an example of anonymous job:

01 import java.io.*;
02 import java.util.*;
03 import org.gridgain.grid.*;
04
05 /**
06  * Test task with anonymous job which uses method scope variable. 
08  */
09 public class TestGridTask extends GridTaskSplitAdapter<String> {
10     /** Dummy multiplier. */
11     private int multiplier = 3;
12
13     /**
14      * This method is responsible for splitting a task into multiple jobs.
15      */
16     @Override
17     protected Collection<? extends GridJob> split(int gridSize, final String arg) throws GridException {
18         List<GridJobAdapter<String>> jobs = new ArrayList<GridJobAdapter<String>>(gridSize);
19        
20         for (int i = 0; i < gridSize; i++) {
21             jobs.add(new GridJobAdapter<String>() {
22                 /**
23                  * Every job simply multiplies number of characters in the argument by some multiplier.
24                  */
25                 public Serializable execute() throws GridException {
26                     return multiplier * arg.length();
27                 }
28             });
29         }
30        
31         return jobs;
32     }
33
34     /**
35      * Reduces multiple job results into one task result.
36      */
37     public Object reduce(List<GridJobResult> results) throws GridException {
38         int sum = 0;
39           
40         // For the sake of this example, let's sum all results.
41         for (GridJobResult res : results) {
42             sum += (Integer)res.getData();
43         }
44
45         return sum;
46     }
47 }

Here we have anonymous job class created at line 21 which uses method-scope variable arg of task class declared in method signature at line 17 and used in job at line 26 as well as task class member multiplier declared at line 11 and used at line 26.

Overriding Methods with Gridify Annotation

If you have following code:

public class A {
    @Gridify
    protected methodA() {
        ...
    }
};

public class B extends A {
    @Override
    protected methodA() {
        ...
        super.methodA();
        ...
    }
}

and use aspects you should get B.methodA() called twice, first on your local node and second time on remote node regardless of class or method modifiers. This is a feature of aspects implementation and we don't recommend to use @Gridify in parent classes.

Here is step by step explanation:

  1. You create object of class B.
  2. You make a call to B.methodA() and since this method does not have annotation in class B aspects will not work.
  3. Your B.methodA() executes and it calls super.methodA()
  4. A.methodA() has annotation and thus aspect will call GridGain and distribute your object of class B and method call to a grid node.
  5. On the grid node (local or remote) B.methodA() will be called (note that you have object of class B) again.
  6. Your B.methodA() executes and it calls super.methodA()
  7. Method A.methodA() has annotation but GridGain will catch this situation and it won't be distributed twice but instead will be just called.

As you can see we have 2 executions of B.methodA() and only one A.methodA().

Resource Injection

GridTask Javadoc and GridJob Javadoc implementations can be injected using IoC (dependency injection) with grid resources. Both, field and method based injection are supported. The following grid resources can be injected:

Resource Description
@GridTaskSessionResource Javadoc Injects Distributed Grid Task Session.
@GridInstanceResource Javadoc Injects the actual instance of Grid Javadoc this task is executed on.
@GridLoggerResource Javadoc Injects an instance of GridLogger Javadoc logger used by this grid instance.
@GridHomeResource Javadoc Injects a path to GridGain installation home.
@GridExecutorServiceResource Javadoc Injects an instance of java.util.concurrent.ExecutorService used by this grid.
@GridLocalNodeIdResource Javadoc Injects local grid node ID of type java.util.UUID.
@GridMBeanServerResource Javadoc Injects an instance of javax.management.MBeanServer used by this grid node.
@GridJobIdResource Javadoc This resource can only be injected into Grid Jobs and not Grid Tasks. It injects unique job execution ID of type java.util.UUID into an instance of Grid Job.
@GridSpringApplicationContextResource Javadoc This resource injects the Spring application context into tasks and jobs. You can use it for accessing Spring beans or any other information available in Spring application context. By default, this application context is the same as the one used for configuring GridGain, but you can pass a custom one by calling GridFactory.start(GridConfiguration, ApplicationContext) Javadoc method.

Note that Spring Application Context is local to every node and is not distributed. Make sure that all bean classes and resources declared in Spring file are available on the node's classpath.
@GridUserResource Javadoc Use this annotation to inject custom resources into tasks and jobs. The scope of this resource is per-task, so it will be initialized once the task is deployed and de-initialized once task is undeployed. Also see @GridUserResourceOnDeployed Javadoc and @GridUserResourceOnUndeployed Javadoc for controlling resource life cycle.
@GridMarshallerResource Javadoc Resource can be injected into the task, job or SPI and gives you simple way of marshalling/unmarshalling data or objects (since 2.1.0)
@GridSpringBeanResource Javadoc Injects any custom resources declared in provided Spring ApplicationContext. It can be injected into grid tasks and grid jobs. The resource will be picked up from provided Spring ApplicationContext by name value. Note, that injected spring bean must be declared in Spring ApplicationContext on every grid node where they get accessed. (since 2.1.0)

Refer to Resources Injection for more information.

Convenience Adapters

GridTask Javadoc and GridJob Javadoc come with several convenience adapters to make the usage easier:

Adapter Description
GridTaskAdapter Javadoc Grid Task adapter that provides default implementation for GridTask.result(GridJobResult, List) Javadoc method which implements automatic fail-over to another node if remote job has failed due to a node crash (detected by GridTopologyException Javadoc exception) or due to job execution rejection (detected by GridExecutionRejectedException Javadoc exception).
GridTaskSplitAdapter Javadoc Grid Task adapter that hides the job-to-node mapping logic from user and provides convenient GridTaskSplitAdapter.split(int, Object) Javadoc method for splitting task into sub-jobs in homogeneous environments.
GridJobAdapter Javadoc Grid Job adapter that provides default empty implementation for GridJob.cancel() Javadoc method and also allows user to set and get job argument, if there is one.

Refer to corresponding adapter documentation for more information.

Distributed Session Attributes And Checkpoints

Both, Grid Tasks and Grid Jobs can utilize Distributed Grid Task Session for coordination with each other via session attributes and checkpoints.

Session Attributes

Jobs can communicate with parent task and with other job siblings from the same task by setting session attributes (see GridTaskSession Javadoc ). Other jobs can wait for an attribute to be set either synchronously or asynchronously. Such functionality allows jobs to synchronize their execution with other jobs at any point and can be useful when other jobs within task need to be made aware of certain event or state change that occurred during job execution.

Saving Checkpoints

Long running jobs may wish to save intermediate checkpoints to protect themselves from failures. There are three checkpoint management methods available on Grid Task Session which allow user to save, load, and remove checkpoints.

Jobs that utilize checkpoint functionality should attempt to load a check point at the beginning of execution. If a non-null value is returned, then job can continue from where it failed last time, otherwise it would start from scratch. Throughout it's execution job should periodically save its intermediate state to avoid starting from scratch in case of a failure.

Refer to Distributed Grid Task Session documentation for more information.

MapReduce Paradigm

The design of GridTask Javadoc is heavily influenced by Google MapReduce paradigm. For more information about MapReduce paradigm, refer to MapReduce: Simplified Data Processing on Large Clusters article from Google.

Examples

To see example on how to use GridTask Javadoc and GridJob Javadoc for basic split/aggregate logic refer to HelloWorld - Basic Grid Task example. For example on how to use GridTask Javadoc with automatic grid-enabling via @Gridify Javadoc annotation refer to HelloWorld - Gridify With Custom Task example.

Powered by Atlassian Confluence, the Enterprise Wiki. (Version: 2.2.10 Build:#528 Nov 29, 2006) - Bug/feature request - Contact Administrators