/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query.h2;

import java.lang.reflect.Array;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.H2DmlPlanKey;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments;
import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.h2.command.Prepared;
import org.h2.command.dml.Delete;
import org.h2.command.dml.Insert;
import org.h2.command.dml.Merge;
import org.h2.command.dml.Update;
import org.h2.table.Column;
import org.h2.util.DateTimeUtils;
import org.h2.util.LocalDateTimeUtils;
import org.h2.value.Value;
import org.h2.value.ValueDate;
import org.h2.value.ValueTime;
import org.h2.value.ValueTimestamp;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class DmlStatementsProcessor {
    private static final int DFLT_DML_RERUN_ATTEMPTS = 4;
    private IgniteH2Indexing idx;
    private IgniteLogger log;
    private static final int PLAN_CACHE_SIZE = 1024;
    private final ConcurrentMap<H2DmlPlanKey, UpdatePlan> planCache = new GridBoundedConcurrentLinkedHashMap(1024);
    private static IgniteInClosure<MutableEntry<Object, Object>> RMV = new IgniteInClosure<MutableEntry<Object, Object>>(){

        public void apply(MutableEntry<Object, Object> e) {
            e.remove();
        }
    };

    public void start(GridKernalContext ctx, IgniteH2Indexing idx) {
        this.idx = idx;
        this.log = ctx.log(DmlStatementsProcessor.class);
    }

    public void onCacheStop(String cacheName) {
        Iterator iter = this.planCache.entrySet().iterator();
        while (iter.hasNext()) {
            UpdatePlan plan = (UpdatePlan)iter.next().getValue();
            if (!F.eq((Object)cacheName, (Object)plan.tbl.cacheName())) continue;
            iter.remove();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private UpdateResult updateSqlFields(String schemaName, PreparedStatement stmt, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
        Object[] errKeys = null;
        long items = 0L;
        UpdatePlan plan = this.getPlanForStatement(schemaName, stmt, null);
        GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context();
        for (int i = 0; i < 4; ++i) {
            UpdateResult r;
            CacheOperationContext opCtx = cctx.operationContextPerCall();
            if (cctx.binaryMarshaller()) {
                CacheOperationContext newOpCtx = null;
                if (opCtx == null) {
                    newOpCtx = new CacheOperationContext(false, null, true, null, false, null, false);
                } else if (!opCtx.isKeepBinary()) {
                    newOpCtx = opCtx.keepBinary();
                }
                if (newOpCtx != null) {
                    cctx.operationContextPerCall(newOpCtx);
                }
            }
            try {
                r = this.executeUpdateStatement(schemaName, cctx, stmt, fieldsQry, loc, filters, cancel, errKeys);
            }
            finally {
                cctx.operationContextPerCall(opCtx);
            }
            items += r.cnt;
            errKeys = r.errKeys;
            if (F.isEmpty((Object[])errKeys)) break;
        }
        if (F.isEmpty(errKeys)) {
            if (items == 1L) {
                return UpdateResult.ONE;
            }
            if (items == 0L) {
                return UpdateResult.ZERO;
            }
        }
        return new UpdateResult(items, errKeys);
    }

    QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, PreparedStatement stmt, SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
        UpdateResult res = this.updateSqlFields(schemaName, stmt, fieldsQry, false, null, cancel);
        QueryCursorImpl resCur = new QueryCursorImpl(Collections.singletonList(Collections.singletonList(res.cnt)), cancel, false);
        resCur.fieldsMeta(IgniteH2Indexing.UPDATE_RESULT_META);
        return resCur;
    }

    GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, PreparedStatement stmt, SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
        UpdateResult res = this.updateSqlFields(schemaName, stmt, fieldsQry, true, filters, cancel);
        return new GridQueryFieldsResultAdapter(IgniteH2Indexing.UPDATE_RESULT_META, (GridCloseableIterator)new IgniteSingletonIterator(Collections.singletonList(res.cnt)));
    }

    long streamUpdateQuery(IgniteDataStreamer streamer, PreparedStatement stmt, Object[] args) throws IgniteCheckedException {
        args = (Object[])U.firstNotNull((Object[])new Object[][]{args, X.EMPTY_OBJECT_ARRAY});
        Prepared p = GridSqlQueryParser.prepared(stmt);
        assert (p != null);
        UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, null);
        if (!F.eq((Object)streamer.cacheName(), (Object)plan.tbl.rowDescriptor().context().name())) {
            throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly in connection options", 1002);
        }
        if (plan.mode == UpdateMode.INSERT && plan.rowsNum > 0) {
            assert (plan.isLocSubqry);
            final GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context();
            final ArrayList data = new ArrayList(plan.rowsNum);
            final GridQueryFieldsResult res = this.idx.queryLocalSqlFields(this.idx.schema(cctx.name()), plan.selectQry, F.asList((Object[])args), null, false, 0, null);
            QueryCursorImpl stepCur = new QueryCursorImpl(new Iterable<List<?>>(){

                @Override
                public Iterator<List<?>> iterator() {
                    try {
                        return new GridQueryCacheObjectsIterator((Iterator)res.iterator(), DmlStatementsProcessor.this.idx.objectContext(), cctx.keepBinary());
                    }
                    catch (IgniteCheckedException e) {
                        throw new IgniteException((Throwable)e);
                    }
                }
            }, null);
            data.addAll(stepCur.getAll());
            QueryCursorImpl cur = new QueryCursorImpl(new Iterable<List<?>>(){

                @Override
                public Iterator<List<?>> iterator() {
                    return data.iterator();
                }
            }, null);
            if (plan.rowsNum == 1) {
                IgniteBiTuple<?, ?> t = this.rowToKeyValue(cctx, (List)cur.iterator().next(), plan);
                streamer.addData(t.getKey(), t.getValue());
                return 1L;
            }
            LinkedHashMap<Object, Object> rows = new LinkedHashMap<Object, Object>(plan.rowsNum);
            for (List row : cur) {
                IgniteBiTuple<?, ?> t = this.rowToKeyValue(cctx, row, plan);
                rows.put(t.getKey(), t.getValue());
            }
            streamer.addData(rows);
            return rows.size();
        }
        throw new IgniteSQLException("Only tuple based INSERT statements are supported in streaming mode", 1002);
    }

    private UpdateResult executeUpdateStatement(String schemaName, GridCacheContext cctx, PreparedStatement prepStmt, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel, Object[] failedKeys) throws IgniteCheckedException {
        QueryCursorImpl cur;
        int mainCacheId = CU.cacheId((String)cctx.name());
        Integer errKeysPos = null;
        UpdatePlan plan = this.getPlanForStatement(schemaName, prepStmt, errKeysPos);
        if (plan.fastUpdateArgs != null) {
            assert (F.isEmpty((Object[])failedKeys) && errKeysPos == null);
            return DmlStatementsProcessor.doFastUpdate(plan, fieldsQry.getArgs());
        }
        assert (!F.isEmpty((String)plan.selectQry));
        if (!loc && !plan.isLocSubqry) {
            SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated()).setArgs(fieldsQry.getArgs()).setDistributedJoins(fieldsQry.isDistributedJoins()).setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder()).setLocal(fieldsQry.isLocal()).setPageSize(fieldsQry.getPageSize()).setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
            cur = (QueryCursorImpl)this.idx.queryDistributedSqlFields(schemaName, newFieldsQry, true, cancel, mainCacheId);
        } else {
            final GridQueryFieldsResult res = this.idx.queryLocalSqlFields(schemaName, plan.selectQry, F.asList((Object[])fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
            cur = new QueryCursorImpl(new Iterable<List<?>>(){

                @Override
                public Iterator<List<?>> iterator() {
                    try {
                        return new GridQueryCacheObjectsIterator((Iterator)res.iterator(), DmlStatementsProcessor.this.idx.objectContext(), true);
                    }
                    catch (IgniteCheckedException e) {
                        throw new IgniteException((Throwable)e);
                    }
                }
            }, cancel);
        }
        int pageSize = loc ? 0 : fieldsQry.getPageSize();
        switch (plan.mode) {
            case MERGE: {
                return new UpdateResult(this.doMerge(plan, (Iterable<List<?>>)cur, pageSize), X.EMPTY_OBJECT_ARRAY);
            }
            case INSERT: {
                return new UpdateResult(this.doInsert(plan, (Iterable<List<?>>)cur, pageSize), X.EMPTY_OBJECT_ARRAY);
            }
            case UPDATE: {
                return this.doUpdate(plan, (Iterable<List<?>>)cur, pageSize);
            }
            case DELETE: {
                return this.doDelete(cctx, (Iterable<List<?>>)cur, pageSize);
            }
        }
        throw new IgniteSQLException("Unexpected DML operation [mode=" + (Object)((Object)plan.mode) + ']', 2001);
    }

    private UpdatePlan getPlanForStatement(String schema, PreparedStatement prepStmt, @Nullable Integer errKeysPos) throws IgniteCheckedException {
        UpdatePlan res;
        Prepared p = GridSqlQueryParser.prepared(prepStmt);
        H2DmlPlanKey planKey = new H2DmlPlanKey(schema, p.getSQL());
        UpdatePlan updatePlan = res = errKeysPos == null ? (UpdatePlan)this.planCache.get(planKey) : null;
        if (res != null) {
            return res;
        }
        res = UpdatePlanBuilder.planForStatement(p, errKeysPos);
        if (errKeysPos == null) {
            return (UpdatePlan)U.firstNotNull((Object[])new UpdatePlan[]{this.planCache.putIfAbsent(planKey, res), res});
        }
        return res;
    }

    private static UpdateResult doFastUpdate(UpdatePlan plan, Object[] args) throws IgniteCheckedException {
        boolean valBounded;
        GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context();
        FastUpdateArguments singleUpdate = plan.fastUpdateArgs;
        assert (singleUpdate != null);
        boolean bl = valBounded = singleUpdate.val != FastUpdateArguments.NULL_ARGUMENT;
        if (singleUpdate.newVal != FastUpdateArguments.NULL_ARGUMENT) {
            Object key = singleUpdate.key.apply(args);
            Object newVal = singleUpdate.newVal.apply(args);
            if (valBounded) {
                Object val = singleUpdate.val.apply(args);
                return cctx.cache().replace(key, val, newVal) ? UpdateResult.ONE : UpdateResult.ZERO;
            }
            return cctx.cache().replace(key, newVal) ? UpdateResult.ONE : UpdateResult.ZERO;
        }
        Object key = singleUpdate.key.apply(args);
        Object val = singleUpdate.val.apply(args);
        if (singleUpdate.val == FastUpdateArguments.NULL_ARGUMENT) {
            return cctx.cache().remove(key) ? UpdateResult.ONE : UpdateResult.ZERO;
        }
        return cctx.cache().remove(key, val) ? UpdateResult.ONE : UpdateResult.ZERO;
    }

    private UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
        BatchSender sender = new BatchSender(cctx, pageSize);
        for (List<?> row : cursor) {
            if (row.size() != 2) {
                U.warn((IgniteLogger)this.log, (Object)("Invalid row size on DELETE - expected 2, got " + row.size()));
                continue;
            }
            sender.add(row.get(0), new ModifyingEntryProcessor(row.get(1), RMV));
        }
        sender.flush();
        SQLException resEx = sender.error();
        if (resEx != null) {
            if (!F.isEmpty(sender.failedKeys())) {
                String msg = "Failed to DELETE some keys because they had been modified concurrently [keys=" + sender.failedKeys() + ']';
                SQLException conEx = IgniteQueryErrorCode.createJdbcSqlException((String)msg, (int)4002);
                conEx.setNextException(resEx);
                resEx = conEx;
            }
            throw new IgniteSQLException(resEx);
        }
        return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
    }

    private UpdateResult doUpdate(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
        GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
        GridCacheContext<?, ?> cctx = desc.context();
        boolean bin = cctx.binaryMarshaller();
        String[] updatedColNames = plan.colNames;
        int valColIdx = plan.valColIdx;
        boolean hasNewVal = valColIdx != -1;
        boolean hasProps = !hasNewVal || updatedColNames.length > 1;
        BatchSender sender = new BatchSender(cctx, pageSize);
        for (List<?> row : cursor) {
            int i;
            Object key = row.get(0);
            HashMap<String, Object> newColVals = new HashMap<String, Object>();
            for (i = 0; i < plan.colNames.length; ++i) {
                if (hasNewVal && i == valColIdx - 2) continue;
                GridQueryProperty prop = plan.tbl.rowDescriptor().type().property(plan.colNames[i]);
                assert (prop != null);
                newColVals.put(plan.colNames[i], DmlStatementsProcessor.convert(row.get(i + 2), desc, prop.type(), plan.colTypes[i]));
            }
            Object newVal = plan.valSupplier.apply(row);
            if (newVal == null) {
                throw new IgniteSQLException("New value for UPDATE must not be null", 4004);
            }
            for (i = 0; i < plan.tbl.getColumns().length - 3; ++i) {
                boolean hasNewColVal;
                GridQueryProperty prop;
                Column c = plan.tbl.getColumn(i + 3);
                if (desc.isKeyValueOrVersionColumn(c.getColumnId()) || (prop = desc.type().property(c.getName())).key() || !(hasNewColVal = newColVals.containsKey(c.getName()))) continue;
                Object colVal = newColVals.get(c.getName());
                desc.setColumnValue(null, newVal, colVal, i);
            }
            if (bin && hasProps) {
                assert (newVal instanceof BinaryObjectBuilder);
                newVal = ((BinaryObjectBuilder)newVal).build();
            }
            Object srcVal = row.get(1);
            if (bin && !(srcVal instanceof BinaryObject)) {
                srcVal = cctx.grid().binary().toBinary(srcVal);
            }
            sender.add(key, new ModifyingEntryProcessor(srcVal, new EntryValueUpdater(newVal)));
        }
        sender.flush();
        SQLException resEx = sender.error();
        if (resEx != null) {
            if (!F.isEmpty(sender.failedKeys())) {
                String msg = "Failed to UPDATE some keys because they had been modified concurrently [keys=" + sender.failedKeys() + ']';
                SQLException dupEx = IgniteQueryErrorCode.createJdbcSqlException((String)msg, (int)4002);
                dupEx.setNextException(resEx);
                resEx = dupEx;
            }
            throw new IgniteSQLException(resEx);
        }
        return new UpdateResult(sender.updateCount(), sender.failedKeys().toArray());
    }

    private static Object convert(Object val, GridH2RowDescriptor desc, Class<?> expCls, int type) throws IgniteCheckedException {
        if (val == null) {
            return null;
        }
        Class<?> currCls = val.getClass();
        if (val instanceof Date && currCls != Date.class && expCls == Date.class) {
            return new Date(((Date)val).getTime());
        }
        if (type == 20 && currCls == byte[].class) {
            return U.unmarshal((Marshaller)desc.context().marshaller(), (byte[])((byte[])val), (ClassLoader)U.resolveClassLoader((IgniteConfiguration)desc.context().gridConfig()));
        }
        if (LocalDateTimeUtils.isJava8DateApiPresent()) {
            if (val instanceof Timestamp && LocalDateTimeUtils.isLocalDateTime(expCls)) {
                return LocalDateTimeUtils.valueToLocalDateTime((ValueTimestamp)ValueTimestamp.get((Timestamp)((Timestamp)val)));
            }
            if (val instanceof Date && LocalDateTimeUtils.isLocalDate(expCls)) {
                return LocalDateTimeUtils.valueToLocalDate((Value)ValueDate.fromDateValue((long)DateTimeUtils.dateValueFromDate((long)((Date)val).getTime())));
            }
            if (val instanceof Time && LocalDateTimeUtils.isLocalTime(expCls)) {
                return LocalDateTimeUtils.valueToLocalTime((Value)ValueTime.get((Time)((Time)val)));
            }
        }
        if (type == 17 && currCls != expCls) {
            if (currCls != Object[].class) {
                throw new IgniteCheckedException("Unexpected array type - only conversion from Object[] is assumed");
            }
            assert (expCls.isArray());
            Object[] curr = (Object[])val;
            Object newArr = Array.newInstance(expCls.getComponentType(), curr.length);
            System.arraycopy(curr, 0, newArr, 0, curr.length);
            return newArr;
        }
        return H2Utils.convert(val, desc, type);
    }

    private static PageProcessingErrorResult splitErrors(Map<Object, EntryProcessorResult<Boolean>> res) {
        LinkedHashSet<Object> errKeys = new LinkedHashSet<Object>(res.keySet());
        SQLException currSqlEx = null;
        SQLException firstSqlEx = null;
        int errors = 0;
        for (Map.Entry<Object, EntryProcessorResult<Boolean>> e : res.entrySet()) {
            try {
                e.getValue().get();
            }
            catch (EntryProcessorException ex) {
                SQLException next = IgniteQueryErrorCode.createJdbcSqlException((String)("Failed to process key '" + e.getKey() + '\''), (int)4005);
                next.initCause(ex);
                if (currSqlEx != null) {
                    currSqlEx.setNextException(next);
                } else {
                    firstSqlEx = next;
                }
                currSqlEx = next;
                errKeys.remove(e.getKey());
                ++errors;
            }
        }
        return new PageProcessingErrorResult(errKeys.toArray(), firstSqlEx, errors);
    }

    private long doMerge(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
        GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
        GridCacheContext<?, ?> cctx = desc.context();
        if (plan.rowsNum == 1) {
            IgniteBiTuple<?, ?> t = this.rowToKeyValue(cctx, cursor.iterator().next(), plan);
            cctx.cache().put(t.getKey(), t.getValue());
            return 1L;
        }
        int resCnt = 0;
        LinkedHashMap<Object, Object> rows = new LinkedHashMap<Object, Object>();
        Iterator<List<?>> it = cursor.iterator();
        while (it.hasNext()) {
            List<?> row = it.next();
            IgniteBiTuple<?, ?> t = this.rowToKeyValue(cctx, row, plan);
            rows.put(t.getKey(), t.getValue());
            if ((pageSize <= 0 || rows.size() != pageSize) && it.hasNext()) continue;
            cctx.cache().putAll(rows);
            resCnt += rows.size();
            if (!it.hasNext()) continue;
            rows.clear();
        }
        return resCnt;
    }

    private long doInsert(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
        GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
        GridCacheContext<?, ?> cctx = desc.context();
        if (plan.rowsNum == 1) {
            IgniteBiTuple<?, ?> t = this.rowToKeyValue(cctx, cursor.iterator().next(), plan);
            if (cctx.cache().putIfAbsent(t.getKey(), t.getValue())) {
                return 1L;
            }
            throw new IgniteSQLException("Duplicate key during INSERT [key=" + t.getKey() + ']', 4001);
        }
        BatchSender sender = new BatchSender(cctx, pageSize);
        for (List<?> row : cursor) {
            IgniteBiTuple<?, ?> keyValPair = this.rowToKeyValue(cctx, row, plan);
            sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue()));
        }
        sender.flush();
        SQLException resEx = sender.error();
        if (!F.isEmpty(sender.failedKeys())) {
            String msg = "Failed to INSERT some keys because they are already in cache [keys=" + sender.failedKeys() + ']';
            SQLException dupEx = new SQLException(msg, null, 4001);
            if (resEx == null) {
                resEx = dupEx;
            } else {
                resEx.setNextException(dupEx);
            }
        }
        if (resEx != null) {
            throw new IgniteSQLException(resEx);
        }
        return sender.updateCount();
    }

    private static PageProcessingResult processPage(GridCacheContext cctx, Map<Object, EntryProcessor<Object, Object, Boolean>> rows) throws IgniteCheckedException {
        Map res = cctx.cache().invokeAll(rows, new Object[0]);
        if (F.isEmpty((Map)res)) {
            return new PageProcessingResult(rows.size(), null, null);
        }
        PageProcessingErrorResult splitRes = DmlStatementsProcessor.splitErrors(res);
        int keysCnt = splitRes.errKeys.length;
        return new PageProcessingResult(rows.size() - keysCnt - splitRes.cnt, splitRes.errKeys, splitRes.ex);
    }

    private IgniteBiTuple<?, ?> rowToKeyValue(GridCacheContext cctx, List<?> row, UpdatePlan plan) throws IgniteCheckedException {
        GridH2RowDescriptor rowDesc = plan.tbl.rowDescriptor();
        GridQueryTypeDescriptor desc = rowDesc.type();
        Object key = plan.keySupplier.apply(row);
        if (QueryUtils.isSqlType((Class)desc.keyClass())) {
            assert (plan.keyColIdx != -1);
            key = DmlStatementsProcessor.convert(key, rowDesc, desc.keyClass(), plan.colTypes[plan.keyColIdx]);
        }
        Object val = plan.valSupplier.apply(row);
        if (QueryUtils.isSqlType((Class)desc.valueClass())) {
            assert (plan.valColIdx != -1);
            val = DmlStatementsProcessor.convert(val, rowDesc, desc.valueClass(), plan.colTypes[plan.valColIdx]);
        }
        if (key == null) {
            throw new IgniteSQLException("Key for INSERT or MERGE must not be null", 4003);
        }
        if (val == null) {
            throw new IgniteSQLException("Value for INSERT or MERGE must not be null", 4004);
        }
        HashMap<String, Object> newColVals = new HashMap<String, Object>();
        for (int i = 0; i < plan.colNames.length; ++i) {
            if (i == plan.keyColIdx || i == plan.valColIdx) continue;
            String colName = plan.colNames[i];
            GridQueryProperty prop = desc.property(colName);
            assert (prop != null);
            Class expCls = prop.type();
            newColVals.put(colName, DmlStatementsProcessor.convert(row.get(i), rowDesc, expCls, plan.colTypes[i]));
        }
        Column[] cols = plan.tbl.getColumns();
        for (int i = 3; i < cols.length; ++i) {
            String colName;
            if (plan.tbl.rowDescriptor().isKeyValueOrVersionColumn(i) || !newColVals.containsKey(colName = cols[i].getName())) continue;
            Object colVal = newColVals.get(colName);
            desc.setValue(colName, key, val, colVal);
        }
        if (cctx.binaryMarshaller()) {
            if (key instanceof BinaryObjectBuilder) {
                key = ((BinaryObjectBuilder)key).build();
            }
            if (val instanceof BinaryObjectBuilder) {
                val = ((BinaryObjectBuilder)val).build();
            }
        }
        return new IgniteBiTuple(key, val);
    }

    static boolean isDmlStatement(Prepared stmt) {
        return stmt instanceof Merge || stmt instanceof Insert || stmt instanceof Update || stmt instanceof Delete;
    }

    private static class BatchSender {
        private final GridCacheContext cctx;
        private final int size;
        private final Map<UUID, Map<Object, EntryProcessor<Object, Object, Boolean>>> batches = new HashMap<UUID, Map<Object, EntryProcessor<Object, Object, Boolean>>>();
        private long updateCnt;
        private List<Object> failedKeys;
        private SQLException err;

        public BatchSender(GridCacheContext cctx, int size) {
            this.cctx = cctx;
            this.size = size;
        }

        public void add(Object key, EntryProcessor<Object, Object, Boolean> proc) throws IgniteCheckedException {
            ClusterNode node = this.cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE);
            if (node == null) {
                throw new IgniteCheckedException("Failed to map key to node.");
            }
            UUID nodeId = node.id();
            Map<Object, EntryProcessor<Object, Object, Boolean>> batch = this.batches.get(nodeId);
            if (batch == null) {
                batch = new HashMap<Object, EntryProcessor<Object, Object, Boolean>>();
                this.batches.put(nodeId, batch);
            }
            batch.put(key, proc);
            if (batch.size() >= this.size) {
                this.sendBatch(batch);
                batch.clear();
            }
        }

        public void flush() throws IgniteCheckedException {
            for (Map<Object, EntryProcessor<Object, Object, Boolean>> batch : this.batches.values()) {
                if (batch.isEmpty()) continue;
                this.sendBatch(batch);
            }
        }

        public long updateCount() {
            return this.updateCnt;
        }

        public List<Object> failedKeys() {
            return this.failedKeys != null ? this.failedKeys : Collections.emptyList();
        }

        public SQLException error() {
            return this.err;
        }

        private void sendBatch(Map<Object, EntryProcessor<Object, Object, Boolean>> batch) throws IgniteCheckedException {
            PageProcessingResult pageRes = DmlStatementsProcessor.processPage(this.cctx, batch);
            this.updateCnt += pageRes.cnt;
            if (this.failedKeys == null) {
                this.failedKeys = new ArrayList<Object>();
            }
            this.failedKeys.addAll(F.asList((Object[])pageRes.errKeys));
            if (pageRes.ex != null) {
                if (this.err == null) {
                    this.err = pageRes.ex;
                } else {
                    this.err.setNextException(pageRes.ex);
                }
            }
        }
    }

    private static final class PageProcessingErrorResult {
        @NotNull
        final Object[] errKeys;
        final int cnt;
        final SQLException ex;

        private PageProcessingErrorResult(@NotNull Object[] errKeys, SQLException ex, int exCnt) {
            errKeys = (Object[])U.firstNotNull((Object[])new Object[][]{errKeys, X.EMPTY_OBJECT_ARRAY});
            assert (exCnt == 0 ^ ex != null);
            this.errKeys = errKeys;
            this.cnt = exCnt;
            this.ex = ex;
        }
    }

    private static final class PageProcessingResult {
        final long cnt;
        @NotNull
        final Object[] errKeys;
        final SQLException ex;

        private PageProcessingResult(long cnt, Object[] errKeys, SQLException ex) {
            this.cnt = cnt;
            this.errKeys = (Object[])U.firstNotNull((Object[])new Object[][]{errKeys, X.EMPTY_OBJECT_ARRAY});
            this.ex = ex;
        }
    }

    private static final class UpdateResult {
        static final UpdateResult ONE = new UpdateResult(1L, X.EMPTY_OBJECT_ARRAY);
        static final UpdateResult ZERO = new UpdateResult(0L, X.EMPTY_OBJECT_ARRAY);
        final long cnt;
        @NotNull
        final Object[] errKeys;

        private UpdateResult(long cnt, Object[] errKeys) {
            this.cnt = cnt;
            this.errKeys = (Object[])U.firstNotNull((Object[])new Object[][]{errKeys, X.EMPTY_OBJECT_ARRAY});
        }
    }

    private static final class EntryValueUpdater
    implements IgniteInClosure<MutableEntry<Object, Object>> {
        private final Object val;

        private EntryValueUpdater(Object val) {
            assert (val != null);
            this.val = val;
        }

        public void apply(MutableEntry<Object, Object> e) {
            e.setValue(this.val);
        }
    }

    private static final class ModifyingEntryProcessor
    implements EntryProcessor<Object, Object, Boolean> {
        private final Object val;
        private final IgniteInClosure<MutableEntry<Object, Object>> entryModifier;

        private ModifyingEntryProcessor(Object val, IgniteInClosure<MutableEntry<Object, Object>> entryModifier) {
            assert (val != null);
            this.val = val;
            this.entryModifier = entryModifier;
        }

        public Boolean process(MutableEntry<Object, Object> entry, Object ... arguments) throws EntryProcessorException {
            if (!entry.exists()) {
                return null;
            }
            Object entryVal = entry.getValue();
            if (entryVal == null) {
                return null;
            }
            if (!F.eq((Object)entryVal, (Object)this.val)) {
                return false;
            }
            this.entryModifier.apply(entry);
            return null;
        }
    }

    private static final class InsertEntryProcessor
    implements EntryProcessor<Object, Object, Boolean> {
        private final Object val;

        private InsertEntryProcessor(Object val) {
            this.val = val;
        }

        public Boolean process(MutableEntry<Object, Object> entry, Object ... arguments) throws EntryProcessorException {
            if (entry.exists()) {
                return false;
            }
            entry.setValue(this.val);
            return null;
        }
    }
}

