diff --git a/src/redset_reedsolomon.cu b/src/redset_reedsolomon.cu index 7ca4db6..5154d7b 100644 --- a/src/redset_reedsolomon.cu +++ b/src/redset_reedsolomon.cu @@ -279,6 +279,14 @@ int redset_encode_reddesc_rs( } #if ENABLE_CUDA +__global__ void add_gpu(unsigned char* a, unsigned char* b, int n) +{ + size_t i = blockDim.x * blockIdx.x + threadIdx.x; + if (i < n) { + a[i] ^= b[i]; + } +} + __global__ void multadd_gpu(unsigned int* gf_log, unsigned int* gf_exp, int gf_size, size_t count, unsigned char* dbuf, unsigned int coeff, unsigned char* rbuf) { /* TODO: read gf_log into gf_exp thread-shared memory */ @@ -299,6 +307,27 @@ __global__ void multadd_gpu(unsigned int* gf_log, unsigned int* gf_exp, int gf_s } } } + +__global__ void scale_gpu(unsigned int* gf_log, unsigned int* gf_exp, int gf_size, size_t count, unsigned char* dbuf, unsigned int coeff) +{ + /* TODO: read gf_log into gf_exp thread-shared memory */ + + size_t i = blockDim.x * blockIdx.x + threadIdx.x; + if (i < count && coeff != 0) { + /* 0 times anything is 0, we treat this as a special case since + * there is no entry for 0 in the log table below, since there + * is no value of x such that 2^x = 0 */ + int data = dbuf[i]; + if (data != 0) { + /* compute (v1 * v2) product as 2^( log_2(v1) + log_2(v2) ) in GF(2^bits) arithmetic */ + int sumlogs = gf_log[coeff] + gf_log[data]; + if (sumlogs >= gf_size - 1) { + sumlogs -= (gf_size - 1); + } + dbuf[i] = (unsigned char) gf_exp[sumlogs]; + } + } +} #endif /* apply ReedSolomon redundancy scheme to dataset files */ @@ -619,6 +648,243 @@ int redset_apply_rs( return rc; } +#if ENABLE_CUDA +static void redset_rs_reduce_decode_gpu( + int ranks, + redset_reedsolomon* state, + unsigned int* gf_log, + unsigned int* gf_exp, + int chunk_id, + int received_rank, + int missing, + int* rows, + int count, + unsigned char* recv_buf, + unsigned char* data_bufs_dev) +{ + int i; + + /* determine encoding block this rank is responsible for in this chunk */ + int received_enc = redset_rs_get_encoding_id(ranks, state->encoding, received_rank, chunk_id); + if (received_enc < ranks) { + /* the data we received from this rank constitues actual data, + * so we need to encode it by adding it to our sum */ + for (i = 0; i < missing; i++) { + /* identify row for the data buffer in the encoding matrix, + * then select the matrix element for the given rank, + * finally mutiply recieved data by that coefficient and add + * it to the data buffer */ + int row = rows[i] + ranks; + unsigned int coeff = state->mat[row * ranks + received_rank]; + + unsigned char* dbuf = data_bufs_dev + i * redset_mpi_buf_size; + int nthreads = 1024; + int nblocks = (count + nthreads - 1) / nthreads; + multadd_gpu<<>>(gf_log, gf_exp, state->gf_size, count, dbuf, coeff, recv_buf); + } + } else { + /* in this case, the rank is responsible for holding a + * checksum block */ + for (i = 0; i < missing; i++) { + /* get encoding row for the current data buffer */ + int row = rows[i] + ranks; + if (row == received_enc) { + /* in this case, we have the checksum, just add it in */ + unsigned char* dbuf = data_bufs_dev + i * redset_mpi_buf_size; + int nthreads = 1024; + int nblocks = (count + nthreads - 1) / nthreads; + add_gpu<<>>(dbuf, recv_buf, count); + } else { + /* otherwise, this rank would have contributed + * 0-data for this chunk and for the selected encoding row */ + } + } + } + + cudaDeviceSynchronize(); + + return; +} + +/* computed product of v1 * v2 using log and inverse log table lookups */ +static unsigned int gf_mult_table_gpu(const redset_reedsolomon* state, unsigned int v1, unsigned int v2) +{ + /* 0 times anything is 0, we treat this as a special case since + * there is no entry for 0 in the log table below, since there + * is no value of x such that 2^x = 0 */ + if (v1 == 0 || v2 == 0) { + return 0; + } + + /* compute (v1 * v2) product as 2^( log_2(v1) + log_2(v2) ) in GF(2^bits) arithmetic */ + int sumlogs = state->gf_log[v1] + state->gf_log[v2]; + if (sumlogs >= state->gf_size - 1) { + sumlogs -= (state->gf_size - 1); + } + int prod = state->gf_exp[sumlogs]; + +#if 0 + if (v1 >= state->gf_size || + v2 >= state->gf_size || + sumlogs >= state->gf_size - 1) + { + printf("ERRROR!!!!!\n"); fflush(stdout); + } +#endif + + return prod; +} + +/* scales a row r in a coefficient matrix in mat of size (rows x cols) + * and an array of count values given in buf by a constant value val */ +static void scale_row_gpu( + redset_reedsolomon* state, + unsigned int* gf_log, + unsigned int* gf_exp, + unsigned int* mat, /* coefficient matrix */ + int rows, /* number of rows in mat */ + int cols, /* number of cols in mat */ + unsigned int val, /* constant to multiply elements by */ + int r, /* row within mat to be scaled by val */ + int count, /* number of elements in buf */ + unsigned char* buf) /* list of values to be scaled by val */ +{ + /* scale values across given row */ + int col; + for (col = 0; col < cols; col++) { + mat[r * cols + col] = gf_mult_table_gpu(state, val, mat[r * cols + col]); + } + + /* scale all values in buffer */ + int nthreads = 1024; + int nblocks = (count + nthreads - 1) / nthreads; + scale_gpu<<>>(gf_log, gf_exp, state->gf_size, count, buf, val); + + return; +} + +/* multiply row a by the constant val, and add to row b in matrix, + * and multiply elements in bufa and add to bufb element wise */ +static void mult_add_row_gpu( + redset_reedsolomon* state, + unsigned int* gf_log, + unsigned int* gf_exp, + unsigned int* mat, + int rows, + int cols, + unsigned int val, + int a, + int b, + int count, + unsigned char* bufa, + unsigned char* bufb) +{ + /* no need to do anything if we've zero'd out the row we're adding */ + if (val == 0) { + return; + } + + /* multiply row a by val and add to row b */ + int col; + for (col = 0; col < cols; col++) { + mat[b * cols + col] ^= (unsigned char) gf_mult_table_gpu(state, val, mat[a * cols + col]); + } + + /* multiply values in bufa by val and add to bufb */ + int nthreads = 1024; + int nblocks = (count + nthreads - 1) / nthreads; + multadd_gpu<<>>(gf_log, gf_exp, state->gf_size, count, bufb, val, bufa); + + return; +} + +/* given matrix in mat of size (rows x cols) swap columns a and b */ +static void swap_columns_gpu(unsigned int* mat, int rows, int cols, int a, int b) +{ + /* nothing to do if source and destination columns are the same */ + if (a == b) { + return; + } + + /* otherwise march down row and swap elements between column a and b */ + int row; + for (row = 0; row < rows; row++) { + unsigned int val = mat[row * cols + a]; + mat[row * cols + a] = mat[row * cols + b]; + mat[row * cols + b] = val; + } +} + +/* solve for x in Ax = b, where A (given in m) is a matrix of size (missing x missing) + * using Gaussian elimination to convert A into an identity matrix, + * here x and b are really matrices of size [missing, count] for count number of + * individual [missing, 1] vectors */ +static void redset_rs_gaussian_solve_gpu( + redset_reedsolomon* state, + unsigned int* gf_log, + unsigned int* gf_exp, + unsigned int* m, /* coefficient matrix to be reduced to an identity matrix */ + int missing, /* number of rows and columns in m */ + int count, /* length of buf arrays */ + unsigned char* bufs) /* at list of count values for each of the missing unknowns */ +{ + /* zero out lower portion of matrix */ + int row; + for (row = 0; row < missing; row++) { + /* search for first element in current row that is non-zero */ + int col; + int nonzero = row; + for (col = row; col < missing; col++) { + unsigned int val = m[row * missing + col]; + if (val > 0) { + nonzero = col; + break; + } + } + + /* swap columns to ensure we have a nonzero in current starting position */ + swap_columns_gpu(m, missing, missing, row, nonzero); + + /* scale current row to start with a 1 */ + unsigned int val = m[row * missing + row]; + if (val != 0) { + unsigned int imult = state->gf_imult[val]; + unsigned char* dbuf = bufs + row * redset_mpi_buf_size; + scale_row_gpu(state, gf_log, gf_exp, m, missing, missing, imult, row, count, dbuf); + cudaDeviceSynchronize(); + } + + /* subtract current row from each row below to zero out any leading 1 */ + int r; + for (r = row + 1; r < missing; r++) { + /* multiply the target row by the leading term and subtract from the current row */ + unsigned int val = m[r * missing + row]; + unsigned char* abuf = bufs + row * redset_mpi_buf_size; + unsigned char* bbuf = bufs + r * redset_mpi_buf_size; + mult_add_row_gpu(state, gf_log, gf_exp, m, missing, missing, val, row, r, count, abuf, bbuf); + } + cudaDeviceSynchronize(); + } + + /* zero out upper portion of matrix */ + for (row = missing - 1; row > 0; row--) { + /* for each row, compute factor needed to cancel out entry in current column + * multiply target row and subtract from current row */ + int r; + for (r = row - 1; r >= 0; r--) { + /* multiply the target row by the leading term and subtract from the current row */ + unsigned int val = m[r * missing + row]; + unsigned char* abuf = bufs + row * redset_mpi_buf_size; + unsigned char* bbuf = bufs + r * redset_mpi_buf_size; + mult_add_row_gpu(state, gf_log, gf_exp, m, missing, missing, val, row, r, count, abuf, bbuf); + } + cudaDeviceSynchronize(); + } + + return; +} +#endif + /* given a filemap, a redundancy descriptor, a dataset id, and a failed rank in my xor set, * rebuild files and add them to the filemap */ int redset_recover_rs_rebuild( @@ -833,10 +1099,37 @@ int redset_recover_rs_rebuild( /* allocate buffer to read a piece of my file */ unsigned char** send_bufs = (unsigned char**) redset_buffers_alloc(1, redset_mpi_buf_size); + unsigned char* sbuf = send_bufs[0]; /* allocate buffer to read a piece of the recevied chunk file, * we might get a message from each rank */ unsigned char** recv_bufs = (unsigned char**) redset_buffers_alloc(d->ranks, redset_mpi_buf_size); + unsigned char* rbuf = recv_bufs[0]; + +#if ENABLE_CUDA + unsigned int* gf_log; + unsigned int* gf_exp; + size_t table_size = state->gf_size * sizeof(unsigned int); + cudaMalloc(&gf_log, table_size); + cudaMalloc(&gf_exp, table_size); + cudaMemcpy(gf_log, state->gf_log, table_size, cudaMemcpyHostToDevice); + cudaMemcpy(gf_exp, state->gf_exp, table_size, cudaMemcpyHostToDevice); + + unsigned char* data_bufs_dev; + unsigned char* recv_bufs_dev; + cudaMalloc((void**)&data_bufs_dev, redset_mpi_buf_size * missing); + cudaMalloc((void**)&recv_bufs_dev, redset_mpi_buf_size * d->ranks); + + unsigned char* send_buf_dev; + cudaMalloc(&send_buf_dev, redset_mpi_buf_size); + + /* switch send/recv to use device buffers */ + rbuf = recv_bufs_dev; + sbuf = send_buf_dev; +#endif + + /* use a host buffer for reading/writing to files */ + unsigned char* host_buf = send_bufs[0]; /* this array will map from missing rank number to missing data segment id, * which falls in the range [0, d->ranks + state->encoding), @@ -878,9 +1171,13 @@ int redset_recover_rs_rebuild( } /* initialize buffers to accumulate reduction results */ +#if ENABLE_CUDA + cudaMemset(data_bufs_dev, 0, redset_mpi_buf_size * missing); +#else for (i = 0; i < missing; i++) { memset(data_bufs[i], 0, count); } +#endif int step_id; for (step_id = 0; step_id < d->ranks; step_id++) { @@ -903,7 +1200,7 @@ int redset_recover_rs_rebuild( unsigned long offset = chunk_size * (unsigned long) chunk_id_rel + nread; /* read data from our file */ - if (redset_lofi_pread(&rsf, send_bufs[0], count, offset) != REDSET_SUCCESS) + if (redset_lofi_pread(&rsf, host_buf, count, offset) != REDSET_SUCCESS) { /* read failed, make sure we fail this rebuild */ rc = REDSET_FAILURE; @@ -915,7 +1212,7 @@ int redset_recover_rs_rebuild( /* seek failed, make sure we fail this rebuild */ rc = REDSET_FAILURE; } - if (redset_read_attempt(chunk_file, fd_chunk, send_bufs[0], count) != count) { + if (redset_read_attempt(chunk_file, fd_chunk, host_buf, count) != count) { /* read failed, make sure we fail this rebuild */ rc = REDSET_FAILURE; } @@ -923,29 +1220,69 @@ int redset_recover_rs_rebuild( } else { /* if we're rebuilding, initialize our send buffer with 0, * so that our input does not contribute to the result */ - memset(send_bufs[0], 0, count); + memset(host_buf, 0, count); } /* pipelined reduce-scatter across ranks */ if (step_id > 0) { +/* TODO: send straight from host buffer to avoid memcpy */ +#if ENABLE_CUDA + /* copy file data from host to device */ + cudaMemcpy(sbuf, host_buf, count, cudaMemcpyHostToDevice); +#else + sbuf = host_buf; +#endif + /* exchange data with neighboring ranks */ - MPI_Irecv(recv_bufs[0], count, MPI_BYTE, lhs_rank, 0, d->comm, &request[0]); - MPI_Isend(send_bufs[0], count, MPI_BYTE, rhs_rank, 0, d->comm, &request[1]); + MPI_Irecv(rbuf, count, MPI_BYTE, lhs_rank, 0, d->comm, &request[0]); + MPI_Isend(sbuf, count, MPI_BYTE, rhs_rank, 0, d->comm, &request[1]); MPI_Waitall(2, request, status); } else { /* if we're rebuilding, initialize our send buffer with 0, * so that our input does not contribute to the result */ - memcpy(recv_bufs[0], send_bufs[0], count); +#if ENABLE_CUDA + /* copy file data from host to device */ + cudaMemcpy(rbuf, host_buf, count, cudaMemcpyHostToDevice); +#else + memcpy(rbuf, sbuf, count); +#endif } /* merge received blocks via xor operation */ - redset_rs_reduce_decode(d->ranks, state, decode_chunk_id, lhs_rank, missing, rows, count, recv_bufs[0], data_bufs); +#if ENABLE_CUDA + redset_rs_reduce_decode_gpu(d->ranks, state, gf_log, gf_exp, decode_chunk_id, lhs_rank, missing, rows, count, rbuf, data_bufs_dev); +#else + redset_rs_reduce_decode(d->ranks, state, decode_chunk_id, lhs_rank, missing, rows, count, rbuf, data_bufs); +#endif + } + +#if 0 +#if ENABLE_CUDA + for (i = 0; i < missing; i++) { + unsigned char* dbuf = data_bufs_dev + i * redset_mpi_buf_size; + cudaMemcpy(data_bufs[i], dbuf, redset_mpi_buf_size, cudaMemcpyDeviceToHost); } +#endif +#endif /* at this point, we need to invert our m matrix to solve for unknown values, - * we invert a copy because we need to do this operation times */ + * we invert a copy because we need to do this operation multiple times */ memcpy(mcopy, m, missing * missing * sizeof(unsigned int)); + +// redset_rs_gaussian_solve(state, mcopy, missing, count, data_bufs); + +#if ENABLE_CUDA + redset_rs_gaussian_solve_gpu(state, gf_log, gf_exp, mcopy, missing, count, data_bufs_dev); +#else redset_rs_gaussian_solve(state, mcopy, missing, count, data_bufs); +#endif + +#if ENABLE_CUDA + for (i = 0; i < missing; i++) { + unsigned char* dbuf = data_bufs_dev + i * redset_mpi_buf_size; + cudaMemcpy(data_bufs[i], dbuf, redset_mpi_buf_size, cudaMemcpyDeviceToHost); + } +#endif /* TODO: for large groups, we may want to add some flow control here */ @@ -1064,6 +1401,19 @@ int redset_recover_rs_rebuild( * we do this on every file instead of just the rebuilt files so that we preserve atime on all files */ redset_lofi_apply_meta(current_hash); +#if ENABLE_CUDA + cudaFree(data_bufs_dev); + cudaFree(recv_bufs_dev); + cudaFree(send_buf_dev); + cudaFree(gf_exp); + cudaFree(gf_log); + data_bufs_dev = NULL; + recv_bufs_dev = NULL; + send_buf_dev = NULL; + gf_exp = NULL; + gf_log = NULL; +#endif + /* free buffers */ redset_buffers_free(missing, &data_bufs); redset_buffers_free(1, &send_bufs);