Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class TemplateRow extends TemplateBase
private static final OpOp1[] SUPPORTED_VECT_UNARY = new OpOp1[]{
OpOp1.EXP, OpOp1.SQRT, OpOp1.LOG, OpOp1.ABS, OpOp1.ROUND, OpOp1.CEIL, OpOp1.FLOOR, OpOp1.SIGN,
OpOp1.SIN, OpOp1.COS, OpOp1.TAN, OpOp1.ASIN, OpOp1.ACOS, OpOp1.ATAN, OpOp1.SINH, OpOp1.COSH, OpOp1.TANH,
OpOp1.CUMSUM, OpOp1.CUMMIN, OpOp1.CUMMAX, OpOp1.SPROP, OpOp1.SIGMOID};
OpOp1.CUMSUM, OpOp1.ROWCUMSUM, OpOp1.CUMMIN, OpOp1.CUMMAX, OpOp1.SPROP, OpOp1.SIGMOID};
private static final OpOp2[] SUPPORTED_VECT_BINARY = new OpOp2[]{
OpOp2.MULT, OpOp2.DIV, OpOp2.MINUS, OpOp2.PLUS, OpOp2.POW, OpOp2.MIN, OpOp2.MAX, OpOp2.XOR,
OpOp2.EQUAL, OpOp2.NOTEQUAL, OpOp2.LESS, OpOp2.LESSEQUAL, OpOp2.GREATER, OpOp2.GREATEREQUAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ public static long getInstNFLOP(
costs = 40;
break;
case "ucumk+":
case "urowcumk+":
case "ucummin":
case "ucummax":
case "ucum*":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public class GPUInstructionParser extends InstructionParser

// Cumulative Ops
String2GPUInstructionType.put( "ucumk+" , GPUINSTRUCTION_TYPE.BuiltinUnary);
String2GPUInstructionType.put( "urowcumk+", GPUINSTRUCTION_TYPE.BuiltinUnary);
String2GPUInstructionType.put( "ucum*" , GPUINSTRUCTION_TYPE.BuiltinUnary);
String2GPUInstructionType.put( "ucumk+*" , GPUINSTRUCTION_TYPE.BuiltinUnary);
String2GPUInstructionType.put( "ucummin" , GPUINSTRUCTION_TYPE.BuiltinUnary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ private CumulativeOffsetFEDInstruction(Operator op, CPOperand in1, CPOperand in2

if ("bcumoffk+".equals(opcode))
_uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucumk+"));
else if ("browcumoffk+".equals(opcode))
_uop = new UnaryOperator(Builtin.getBuiltinFnObject("urowcumk+"));
else if ("bcumoff*".equals(opcode))
_uop = new UnaryOperator(Builtin.getBuiltinFnObject("ucum*"));
else if ("bcumoff+*".equals(opcode))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static UnaryMatrixFEDInstruction parseInstruction(String str) {
in.split(parts[1]);
out.split(parts[2]);
ValueFunction func = Builtin.getBuiltinFnObject(opcode);
if(Arrays.asList(new String[] {"ucumk+", "ucum*", "ucumk+*", "ucummin", "ucummax", "exp", "log", "sigmoid"})
if(Arrays.asList(new String[] {"ucumk+", "urowcumk+", "ucum*", "ucumk+*", "ucummin", "ucummax", "exp", "log", "sigmoid"})
.contains(opcode)) {
UnaryOperator op = new UnaryOperator(func, Integer.parseInt(parts[3]), Boolean.parseBoolean(parts[4]));
return new UnaryMatrixFEDInstruction(op, in, out, opcode, str);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ public void processInstruction(ExecutionContext ec) {
LibMatrixCUDA.cumulativeScan(ec, ec.getGPUContext(0), getExtendedOpcode(), "cumulative_sum", mat,
_output.getName());
break;
case "urowcumk+":
LibMatrixCUDA.cumulativeScan(ec, ec.getGPUContext(0), getExtendedOpcode(), "row_cumulative_sum", mat,
_output.getName());
break;
case "ucum*":
LibMatrixCUDA.cumulativeScan(ec, ec.getGPUContext(0), getExtendedOpcode(), "cumulative_prod", mat,
_output.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.functionobjects.KahanPlus;
import org.apache.sysds.runtime.functionobjects.PlusMultiply;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.cp.KahanObject;
import org.apache.sysds.runtime.instructions.spark.utils.RDDAggregateUtils;
import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
Expand All @@ -52,62 +54,135 @@ public static CumulativeAggregateSPInstruction parseInstruction( String str ) {
CPOperand in1 = new CPOperand(parts[1]);
CPOperand out = new CPOperand(parts[2]);
AggregateUnaryOperator aggun = InstructionUtils.parseCumulativeAggregateUnaryOperator(opcode);
return new CumulativeAggregateSPInstruction(aggun, in1, out, opcode, str);
return new CumulativeAggregateSPInstruction(aggun, in1, out, opcode, str);
}

@Override
public void processInstruction(ExecutionContext ec) {
SparkExecutionContext sec = (SparkExecutionContext)ec;
DataCharacteristics mc = sec.getDataCharacteristics(input1.getName());

//get input
JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() );

if ("urowcumk+".equals(getOpcode())) {
processRowCumsum(sec, in, mc);
} else {
processCumsum(sec, in, mc);
}
}

private void processRowCumsum(SparkExecutionContext sec, JavaPairRDD<MatrixIndexes,MatrixBlock> in, DataCharacteristics mc) {
JavaPairRDD<MatrixIndexes, MatrixBlock> localRowCumsum =
in.mapToPair(new LocalRowCumsumFunction());

sec.setRDDHandleForVariable(output.getName(), localRowCumsum);
sec.addLineageRDD(output.getName(), input1.getName());
sec.getDataCharacteristics(output.getName()).set(mc);
}

public static Tuple2<JavaPairRDD<MatrixIndexes, MatrixBlock>, JavaPairRDD<MatrixIndexes, MatrixBlock>>
processRowCumsumWithEndValues(JavaPairRDD<MatrixIndexes,MatrixBlock> in) {
JavaPairRDD<MatrixIndexes, MatrixBlock> localRowCumsum =
in.mapToPair(new LocalRowCumsumFunction());

JavaPairRDD<MatrixIndexes, MatrixBlock> endValues =
localRowCumsum.mapToPair(new ExtractEndValuesFunction());

return new Tuple2<>(localRowCumsum, endValues);
}

private void processCumsum(SparkExecutionContext sec, JavaPairRDD<MatrixIndexes,MatrixBlock> in, DataCharacteristics mc) {
DataCharacteristics mcOut = new MatrixCharacteristics(mc);
long rlen = mc.getRows();
int blen = mc.getBlocksize();
mcOut.setRows((long)(Math.ceil((double)rlen/blen)));

//get input
JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable( input1.getName() );


//execute unary aggregate (w/ implicit drop correction)
AggregateUnaryOperator auop = (AggregateUnaryOperator) _optr;
JavaPairRDD<MatrixIndexes,MatrixBlock> out =
in.mapToPair(new RDDCumAggFunction(auop, rlen, blen));
JavaPairRDD<MatrixIndexes,MatrixBlock> out =
in.mapToPair(new RDDCumAggFunction(auop, rlen, blen));
//merge partial aggregates, adjusting for correct number of partitions
//as size can significant shrink (1K) but also grow (sparse-dense)
int numParts = SparkUtils.getNumPreferredPartitions(mcOut);
int minPar = (int)Math.min(SparkExecutionContext.getDefaultParallelism(true), mcOut.getNumBlocks());
out = RDDAggregateUtils.mergeByKey(out, Math.max(numParts, minPar), false);

//put output handle in symbol table
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), input1.getName());
sec.getDataCharacteristics(output.getName()).set(mcOut);
}

private static class RDDCumAggFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock>
private static class LocalRowCumsumFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> {
private static final long serialVersionUID = 123L;

@Override
public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception {
MatrixIndexes idx = kv._1;
MatrixBlock inputBlock = kv._2;
MatrixBlock outBlock = new MatrixBlock(inputBlock.getNumRows(), inputBlock.getNumColumns(), false);

for (int i = 0; i < inputBlock.getNumRows(); i++) {
KahanObject kbuff = new KahanObject(0, 0);
KahanPlus kplus = KahanPlus.getKahanPlusFnObject();

for (int j = 0; j < inputBlock.getNumColumns(); j++) {
double val = inputBlock.get(i, j);
kplus.execute2(kbuff, val);
outBlock.set(i, j, kbuff._sum);
}
}
// original index, original matrix and local cumsum block
return new Tuple2<>(idx, outBlock);
}
}

private static class ExtractEndValuesFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> {
private static final long serialVersionUID = 123L;

@Override
public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception {
MatrixIndexes idx = kv._1;
MatrixBlock cumsumBlock = kv._2;

MatrixBlock endValuesBlock = new MatrixBlock(cumsumBlock.getNumRows(), 1, false);
for (int i = 0; i < cumsumBlock.getNumRows(); i++) {
if (cumsumBlock.getNumColumns() > 0) {
endValuesBlock.set(i, 0, cumsumBlock.get(i, cumsumBlock.getNumColumns() - 1));
} else {
endValuesBlock.set(i, 0, 0.0);
}
}
return new Tuple2<>(idx, endValuesBlock);
}
}

private static class RDDCumAggFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock>
{
private static final long serialVersionUID = 11324676268945117L;

private final AggregateUnaryOperator _op;
private UnaryOperator _uop = null;
private final long _rlen;
private final int _blen;

public RDDCumAggFunction( AggregateUnaryOperator op, long rlen, int blen ) {
_op = op;
_rlen = rlen;
_blen = blen;
}

@Override
public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 )
throws Exception
public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 )
throws Exception
{
MatrixIndexes ixIn = arg0._1();
MatrixBlock blkIn = arg0._2();

MatrixIndexes ixOut = new MatrixIndexes();
MatrixBlock blkOut = new MatrixBlock();

//process instruction
AggregateUnaryOperator aop = _op;
if( aop.aggOp.increOp.fn instanceof PlusMultiply ) { //cumsumprod
Expand All @@ -125,19 +200,19 @@ public Tuple2<MatrixIndexes, MatrixBlock> call( Tuple2<MatrixIndexes, MatrixBloc
if( aop.aggOp.existsCorrection() )
blkOut.dropLastRowsOrColumns(aop.aggOp.correction);
}

//cumsum expand partial aggregates
long rlenOut = (long)Math.ceil((double)_rlen/_blen);
long rixOut = (long)Math.ceil((double)ixIn.getRowIndex()/_blen);
int rlenBlk = (int) Math.min(rlenOut-(rixOut-1)*_blen, _blen);
int clenBlk = blkOut.getNumColumns();
int posBlk = (int) ((ixIn.getRowIndex()-1) % _blen);

//construct sparse output blocks (single row in target block size)
MatrixBlock blkOut2 = new MatrixBlock(rlenBlk, clenBlk, true);
blkOut2.copy(posBlk, posBlk, 0, clenBlk-1, blkOut, true);
ixOut.setIndexes(rixOut, ixOut.getColumnIndex());

//output new tuple
return new Tuple2<>(ixOut, blkOut2);
}
Expand Down
Loading
Loading