Dashboard > GridGain User Guide > Table Of Contents > Examples Gallery > MapReduce with Prime Numbers and Gridify Annotation > GridifyPrimeTask.java
GridifyPrimeTask.java
Added by ghost, last edited by ghost on Feb 05, 2008  (view change)
Labels: 
(None)


Task responsible for checking passed in value for prime. It creates as many jobs as there are nodes in the grid and sends this job to grid nodes for execution (note, that local node also participates in execution).

Every grid job gets a range of divisors to check. The lower and upper boundaries of this range are passed into job as arguments. The jobs invokes GridPrimeChecker.java to check if the value passed in is divisible by any of the divisors in the range. Refer to GridPrimeChecker.java for algorithm specifics (it is very unsophisticated).

Upon receiving results from every job reduce(List) method is invoked. In this method we determine if any divisor was found. If we found a divisor, then we know that a number is not a prime number and there is no need to wait for other job results. In this case, the method will return GridJobResultPolicy.REDUCE policy in order to start aggregation of results. All remaining jobs will be cancelled by the system. However, note that it is responsibility of GridPrimeChecker.java to constantly check if it was cancelled (via thread interruption) and abort when needed.

Aggregation of results happens in reduce(List) method. In this method we determine if any of the remote jobs returned a divisor.If divisor is found, then we return it, otherwise we return null.

Full Source Code

GridifyPrimeTask.java
package org.gridgain.examples.primenumbers.gridify;

import java.util.*;
import org.gridgain.examples.primenumbers.*;
import org.gridgain.grid.*;
import org.gridgain.grid.gridify.*;

public class GridifyPrimeTask extends GridifyTaskSplitAdapter<Long> {
    /**
     * {@inheritDoc}
     */
    @Override
    protected Collection<? extends GridJob> split(int gridSize, GridifyArgument arg) {
        // Grid-enabled method parameters.
        Object[] args = arg.getMethodParameters();

        // Get parameters from grid-enabled Java method. These are the parameters
        // passed into 'GridPrimeChecker.checkPrime(long, long, long)' method.
        long val = (Long)args[0];
        long taskMinRange = (Long)args[1];

        long numbersPerTask = val / gridSize < 10 ? 10 : val / gridSize;

        List<GridJobAdapter<Long>> jobs = new ArrayList<GridJobAdapter<Long>>(gridSize);

        long jobMinRange = 0;
        long jobMaxRange = 0;

        // In this loop we create as many grid jobs as
        // there are nodes in the grid.
        for (int i = 0; jobMaxRange < val ; i++) {
            jobMinRange = i * numbersPerTask + taskMinRange;
            jobMaxRange = (i + 1) * numbersPerTask + taskMinRange - 1;

            if (jobMaxRange > val) {
                jobMaxRange = val;
            }

            // Pass in value to check, and minimum/maximum range boundaries
            // into job as arguments.
            jobs.add(new GridJobAdapter<Long>(val, jobMinRange, jobMaxRange) {
                /**
                 * Check if the value passed in is divisible by
                 * any of the divisors in the range. If so,
                 * return the first divisor found, otherwise
                 * return <tt>null</tt>.
                 *
                 * @return First divisor found or <tt>null</tt> if no
                 *      divisor was found.
                 *
                 * @see GridJob#execute()
                 */
                public Long execute() throws GridException {
                    long val = getArgument(0);
                    long min = getArgument(1);
                    long max = getArgument(2);

                    // Return first divisor found or null if no
                    // divisor was found.
                    return GridPrimeChecker.checkPrime(val, min, max);
                }
            });
        }

        // List of jobs to be executed on the grid.
        return jobs;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public GridJobResultPolicy result(GridJobResult result, List<GridJobResult> received) throws GridException {
        // If devisor is found then complete right away, otherwise, keep waiting.
        if(result.getData() != null) {
            // Start reducing. All jobs that are still running
            // will be cancelled automatically.
            return GridJobResultPolicy.REDUCE;
        }

        return super.result(result, received);
    }

    /**
     * {@inheritDoc}
     */
    public Long reduce(List<GridJobResult> results) {
        for (GridJobResult res : results) {
            if (res.getData() != null) {
                return res.getData();
            }
        }

        // No divisor was found, the value is 'prime'.
        return null;
    }
}

Powered by Atlassian Confluence, the Enterprise Wiki. (Version: 2.2.10 Build:#528 Nov 29, 2006) - Bug/feature request - Contact Administrators