Example task that demonstrate usage of JBoss Cache together with GridGain. This task receives a list of cache keys as an argument and then creates a job for every cache key to be sent for remote execution. Each job will attempt to get data from cache, and if it is not there, then put it into cache.
If this task is run with same list of keys multiple times, then after first execution it will store all keys in the cache (since no data was in the cache). Then, on consecutive executions data will be retrieved from cache since it was already cached during first execution.
Note that since we are using GridGain GridAffinityLoadBalancingSpi ![]()
Full Source Code
package org.gridgain.examples.jbosscache; import org.gridgain.grid.*; import org.gridgain.grid.logger.*; import org.gridgain.grid.resources.*; import org.gridgain.grid.spi.loadbalancing.affinity.*; import java.util.*; import java.io.*; public class GridJbossCacheExampleTask extends GridTaskSplitAdapter<List<Integer>, Map<Integer, UUID>> { /** Injected logger (for demonstration purpose). */ @GridLoggerResource private GridLogger log = null; /** Injected local node ID. */ @GridLocalNodeIdResource private UUID locNodeId = null; /** * For every cache key passed in, this method will create an instance of {@link GridAffinityJob}. * The implementation then will route the jobs using {@link GridAffinityLoadBalancingSpi} to * the node responsible for caching job's key. * * @param gridSize Size of the grid. * @param arg List of cache keys. * @return List of affinity jobs. * @see GridTaskSplitAdapter */ @Override protected Collection<? extends GridJob> split(int gridSize, List<Integer> arg) { assert arg != null; List<GridAffinityJob<Integer>> jobs = new ArrayList<GridAffinityJob<Integer>>(); for (Integer cacheKey : arg) { jobs.add(new GridAffinityJobAdapter<Integer, Serializable>(cacheKey) { /** * This jobs will: * 1. Receive cache key as an argument. * 2. Try to accessed data by this cache key from cache. * 3. If data is not there, put data into cache. * 4. Return cache key mapped to node this job executed on. */ public Serializable execute() throws GridException { // We simply use job argument as cache key. This // must be the same key returned by 'getAffinityKey()' // method above. int cacheKey = getAffinityKey(); GridJbossCacheManager dataMgr = GridJbossCacheManager.getInstance(); String cacheVal = dataMgr.getDataFromCache(cacheKey); // For the sake of this example, we simply check if // data is stored in cache, and if not, we put it into // cache. In real life you would most likely use CacheLoader // from JBoss Cache to load data into cache. if (cacheVal == null) { cacheVal = "Cached value: " + cacheKey; dataMgr.putDataToCache(cacheKey, cacheVal); log.info("Put data into cache [cacheKey=" + cacheKey + ", cacheVal=" + cacheVal + ", nodeId=" + locNodeId + ']'); } else { log.info("Retrieved data from cache [cacheKey=" + cacheKey + ", cacheVal=" + cacheVal + ", nodeId=" + locNodeId + ']'); } // We return cache-key/local-node-ID pair to indicate // on which node a job has executed. return (Serializable)Collections.singletonMap(cacheKey, locNodeId); } }); } // In the adapter implementation, GridGain uses // Affinity Load Balancer to map jobs to nodes. // Job with the same affinity key will always be // mapped to the same node. return jobs; } /** * Aggregate results from all jobs. * * @param results Results from all jobs. * @return Aggregated result which in this case contains every cache key * accessed from every job mapped to the ID of the node it was * accessed on. */ public Map<Integer, UUID> reduce(List<GridJobResult> results) { // Map of cache keys mapped to the nodes they were accessed on. Map<Integer, UUID> keyNodeMap = new HashMap<Integer, UUID>(); for (GridJobResult res : results) { Map<Integer, UUID> jobRes = res.getData(); assert jobRes != null; // All jobs return a map of size 1, with cache key // mapped to node ID the job executed on. assert jobRes.size() == 1; keyNodeMap.putAll(jobRes); } return keyNodeMap; } }
