MapReduce with Node Metrics Example.
Package:
org.gridgain.examples.metrics
This example demonstrates comprehensive split logic that take into account remote node metrics (see GridNodeMetrics ![]()
Jobs will be routed to the nodes that have at least 2 physical CPUs and average idle time more than 50%. Node metrics provided by every node are updated periodically (the update time frame depends on discovery SPI settings) and using average metrics one can be sure that they reflects more or less correct node state.
Node metrics can be used in conjunction with node attributes like operating system or JRE version that makes this split logic more flexible.
There are two classes implemented for this example:
Running Grid Node
This example will need one remote node to be running. Note that you don't need another machine for it - you can start remote node on the same machine you are running example on.
To start a remote node open the terminal window on Linux/Mac OS X or Command Prompt on Windows, change directory to ${GRIDGAIN_HOME}/bin and run the gridgain.{sh|bat} script. It takes 2-3 seconds for grid node to start and if everything worked fine you should see starting log ending with successful start acknowledgment.
GridMetricsExample.java
1. Import GridGain classes.
import org.gridgain.grid.*;
2. Add Grid Start and Stop.
GridFactory.start();
try {
...
}
finally {
GridFactory.stop(true);
}
finally clause allows for graceful grid shutdown in case of the exceptions.
3. Add Grid Task Execution.
grid.execute(GridMetricsTask.class, null).get();
Note that method grid.execute(...) returns GridTaskFuture ![]()
![]()
Full Source Code
package org.gridgain.examples.metrics; import org.gridgain.grid.*; public final class GridMetricsExample { /** * Ensure singleton. */ private GridMetricsExample() { // No-op. } /** * Execute <tt>HelloWorld</tt> example on the grid. * * @param args Command line arguments, none required but if provided * first one should point to the Spring XML configuration file. See * <tt>"examples/config/"</tt> for configuration file examples. * @throws GridException If example execution failed. */ public static void main(String[] args) throws GridException { if (args.length == 0) { GridFactory.start(); } else { GridFactory.start(args[0]); } try { Grid grid = GridFactory.getGrid(); // Execute task. grid.execute(GridMetricsTask.class, null).get(); System.out.println(">>>"); System.out.println(">>> Finished execution of GridMetricsExample."); System.out.println(">>> Check log output for every node."); System.out.println(">>>"); } finally { GridFactory.stop(true); } } }
GridMetricsTask.java
This is a grid task implementation that is responsible for split and aggregate (a.k.a map/reduce) logic. Note that this implementation uses GridTaskSplitAdapter ![]()
For the purpose of example, this task inspects all nodes in its topology and sends a job to a node only if number of processors on that node is greater than 1 and current CPU load on that node is less 50%. If no nodes match such criteria, then local node is used for execution.
Full Source Code
package org.gridgain.examples.metrics; import org.gridgain.grid.*; import org.gridgain.grid.logger.*; import org.gridgain.grid.resources.*; import java.util.*; import java.io.*; public class GridMetricsTask extends GridTaskAdapter<Object, Object> { /** Injected grid logger. */ @GridLoggerResource private GridLogger log = null; /** Injected grid instance. */ @GridInstanceResource private Grid grid = null; /** * This task will create jobs and send them to remote nodes only * if remote node has more than 1 processor and CPU load on remote * node is less than 50%. * <p> * If none of the nodes fall under criteria above, then job will be * executed locally. * * @param subgrid Task node topology. * @param arg Task argument (ignored for this example). * @return {@link GridJob} instances mapped to nodes for execution. * @throws GridException If map operation failed. */ public Map<? extends GridJob, GridNode> map(List<GridNode> subgrid, Object arg) throws GridException { Map<GridJobAdapter<Serializable>, GridNode> jobs = new HashMap<GridJobAdapter<Serializable>, GridNode>(subgrid.size()); for (GridNode node : subgrid) { // Get metrics for given node. GridNodeMetrics metrics = node.getMetrics(); log.info("Checking node metrics [nodeId=" + node.getId() + ", cpuLoad=" + metrics.getCurrentCpuLoad() + ", availableProcessors=" + metrics.getAvailableProcessors() + ']'); // For the sake of this example, we only send a job to a node // if it has more than one processor and if it's CPU is less than 50% loaded. if (metrics.getAvailableProcessors() > 1 && metrics.getCurrentCpuLoad() < 0.5) { jobs.put(new GridMetricsJob(), node); } } // If no node qualified for job execution because either // number of processors was 1 or CPU load was greater than // 50%, then execute the job locally. if (jobs.isEmpty() == true) { jobs.put(new GridMetricsJob(), grid.getLocalNode()); } return jobs; } /** * {@inheritDoc} */ public Object reduce(List<GridJobResult> results) throws GridException { // Nothing to reduce. return null; } /** * Example job to demonstrate node metrics usage. * The execution simply prints out the metrics for * the local node. * * @author morpheus * @version 2.0 */ private static class GridMetricsJob extends GridJobAdapter<Serializable> { /** Injected grid instance. */ @GridInstanceResource private Grid grid = null; /** Injected grid logger. */ @GridLoggerResource private GridLogger log = null; /** * For the purpose of this example, we simply print out metrics * for the node this job is running on. * * @return <tt>null</tt> as the job simply prints out metrics on * local node. */ public Serializable execute() { // Simply print out metrics for the node this job is running on // and return nothing. log.info("Printing node metrics from grid job [nodeId=" + grid.getLocalNode().getId() + ", nodeMetrics=" + grid.getLocalNode().getMetrics() + ']'); // Nothing to return. return null; } } }
