Dashboard > GridGain User Guide > Table Of Contents > Examples Gallery > MapReduce with Node Metrics
MapReduce with Node Metrics
Added by ghost, last edited by ghost on Mar 08, 2008  (view change)
Labels: 
(None)


MapReduce with Node Metrics Example.

Package:
org.gridgain.examples.metrics

This example demonstrates comprehensive split logic that take into account remote node metrics (see GridNodeMetrics Javadoc ).
Jobs will be routed to the nodes that have at least 2 physical CPUs and average idle time more than 50%. Node metrics provided by every node are updated periodically (the update time frame depends on discovery SPI settings) and using average metrics one can be sure that they reflects more or less correct node state.

Node metrics can be used in conjunction with node attributes like operating system or JRE version that makes this split logic more flexible.

There are two classes implemented for this example:

Running Grid Node

This example will need one remote node to be running. Note that you don't need another machine for it - you can start remote node on the same machine you are running example on.

To start a remote node open the terminal window on Linux/Mac OS X or Command Prompt on Windows, change directory to ${GRIDGAIN_HOME}/bin and run the gridgain.{sh|bat} script. It takes 2-3 seconds for grid node to start and if everything worked fine you should see starting log ending with successful start acknowledgment.

GridMetricsExample.java

1. Import GridGain classes.

import org.gridgain.grid.*;

2. Add Grid Start and Stop.

GridFactory.start();
        
try {
    ...
}
finally {
    GridFactory.stop(true);
}

finally clause allows for graceful grid shutdown in case of the exceptions.

3. Add Grid Task Execution.

grid.execute(GridMetricsTask.class, null).get();

Note that method grid.execute(...) returns GridTaskFuture Javadoc . Method get() on GridTaskFuture Javadoc allows to block until task execution is completed. This is a standard approach borrowed from java.util.concurrent.Future interface.

Full Source Code

GridMetricsExample.java
package org.gridgain.examples.metrics;

import org.gridgain.grid.*;

public final class GridMetricsExample {
    /**
     * Ensure singleton.
     */
    private GridMetricsExample() {
        // No-op.
    }

    /**
     * Execute <tt>HelloWorld</tt> example on the grid.
     *
     * @param args Command line arguments, none required but if provided
     *      first one should point to the Spring XML configuration file. See
     *      <tt>"examples/config/"</tt> for configuration file examples.
     * @throws GridException If example execution failed.
     */
    public static void main(String[] args) throws GridException {
        if (args.length == 0) {
            GridFactory.start();
        }
        else {
            GridFactory.start(args[0]);
        }

        try {
            Grid grid = GridFactory.getGrid();

            // Execute task.
            grid.execute(GridMetricsTask.class, null).get();

            System.out.println(">>>");
            System.out.println(">>> Finished execution of GridMetricsExample.");
            System.out.println(">>> Check log output for every node.");
            System.out.println(">>>");
        }
        finally {
            GridFactory.stop(true);
        }
    }
}

GridMetricsTask.java

This is a grid task implementation that is responsible for split and aggregate (a.k.a map/reduce) logic. Note that this implementation uses GridTaskSplitAdapter Javadoc that simplifies API for grid tasks in homogeneous grids (which is often the case). Main two methods that are implemented here are split and reduce. Method reduce does nothing as all jobs returns null values.

For the purpose of example, this task inspects all nodes in its topology and sends a job to a node only if number of processors on that node is greater than 1 and current CPU load on that node is less 50%. If no nodes match such criteria, then local node is used for execution.

Full Source Code

GridMetricsTask.java
package org.gridgain.examples.metrics;

import org.gridgain.grid.*;
import org.gridgain.grid.logger.*;
import org.gridgain.grid.resources.*;
import java.util.*;
import java.io.*;

public class GridMetricsTask extends GridTaskAdapter<Object, Object> {
    /** Injected grid logger. */
    @GridLoggerResource
    private GridLogger log = null;

    /** Injected grid instance. */
    @GridInstanceResource
    private Grid grid = null;

    /**
     * This task will create jobs and send them to remote nodes only
     * if remote node has more than 1 processor and CPU load on remote
     * node is less than 50%. 
     * <p>
     * If none of the nodes fall under criteria above, then job will be 
     * executed locally.
     * 
     * @param subgrid Task node topology.
     * @param arg Task argument (ignored for this example).
     * @return {@link GridJob} instances mapped to nodes for execution.
     * @throws GridException If map operation failed. 
     */
    public Map<? extends GridJob, GridNode> map(List<GridNode> subgrid, Object arg) throws GridException {
        Map<GridJobAdapter<Serializable>, GridNode> jobs = 
            new HashMap<GridJobAdapter<Serializable>, GridNode>(subgrid.size());

        for (GridNode node : subgrid) {
            // Get metrics for given node.
            GridNodeMetrics metrics = node.getMetrics();

            log.info("Checking node metrics [nodeId=" + node.getId() + ", cpuLoad=" + metrics.getCurrentCpuLoad() +
                ", availableProcessors=" + metrics.getAvailableProcessors() + ']');

            // For the sake of this example, we only send a job to a node 
            // if it has more than one processor and if it's CPU is less than 50% loaded.
            if (metrics.getAvailableProcessors() > 1 && metrics.getCurrentCpuLoad() < 0.5) {
                jobs.put(new GridMetricsJob(), node);
            }
        }
        
        // If no node qualified for job execution because either 
        // number of processors was 1 or CPU load was greater than
        // 50%, then execute the job locally.
        if (jobs.isEmpty() == true) {
            jobs.put(new GridMetricsJob(), grid.getLocalNode());
        }

        return jobs;
    }

    /**
     * {@inheritDoc}
     */
    public Object reduce(List<GridJobResult> results) throws GridException {
        // Nothing to reduce.
        return null;
    }

    /**
     * Example job to demonstrate node metrics usage.
     * The execution simply prints out the metrics for
     * the local node.
     *
     * @author morpheus
     * @version 2.0
     */
    private static class GridMetricsJob extends GridJobAdapter<Serializable> {
        /** Injected grid instance. */
        @GridInstanceResource
        private Grid grid = null;

        /** Injected grid logger. */
        @GridLoggerResource
        private GridLogger log = null;
        
        /**
         * For the purpose of this example, we simply print out metrics
         * for the node this job is running on.
         * 
         * @return <tt>null</tt> as the job simply prints out metrics on
         *      local node.
         */
        public Serializable execute() {
            // Simply print out metrics for the node this job is running on
            // and return nothing.
            log.info("Printing node metrics from grid job [nodeId=" + grid.getLocalNode().getId() + 
                ", nodeMetrics=" + grid.getLocalNode().getMetrics() + ']');
            
            // Nothing to return.
            return null;
        }
    }
}

Powered by Atlassian Confluence, the Enterprise Wiki. (Version: 2.2.10 Build:#528 Nov 29, 2006) - Bug/feature request - Contact Administrators