/* * Copyright (c) 2000, 2001, 2002, 2003, 2004, 2005, 2008, 2009 * The President and Fellows of Harvard College. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of the University nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ /* * psort - parallel sort. * * This is loosely based on some real parallel sort benchmarks, but * because of various limitations of OS/161 it is massively * inefficient. But that's ok; the goal is to stress the VM and buffer * cache. */ #include #include #include #include #include #include #include #include #include #include #include #ifndef RANDOM_MAX /* Note: this is correct for OS/161 but not for some Unix C libraries */ #define RANDOM_MAX RAND_MAX #endif #define PATH_KEYS "sortkeys" #define PATH_SORTED "output" #define PATH_TESTDIR "psortdir" #define PATH_RANDOM "rand:" #define WORKNUM (128*1024) static int workspace[WORKNUM]; static const char *progname; static int numprocs = 4; static int numkeys = 10000; static long randomseed = 15432753; static off_t correctsize; static unsigned long checksum; #define NOBODY (-1) static int me = NOBODY; //////////////////////////////////////////////////////////// static void sortints(int *v, int num) { int pivotval, pivotpoint, pivotcount; int frontpos, readpos, endpos, i, j; int tmp; if (num < 2) { return; } pivotpoint = num/2; pivotval = v[pivotpoint]; pivotcount = 0; frontpos = 0; readpos = 0; endpos = num; while (readpos < endpos) { if (v[readpos] < pivotval) { v[frontpos++] = v[readpos++]; } else if (v[readpos] == pivotval) { readpos++; pivotcount++; } else { tmp = v[--endpos]; v[endpos] = v[readpos]; v[readpos] = tmp; } } assert(readpos == endpos); assert(frontpos + pivotcount == readpos); for (i=frontpos; i= 0) { snprintf(buf, len, "%s: proc %d: ", progname, me); } else { snprintf(buf, len, "%s: ", progname); } pos = strlen(buf); vsnprintf(buf+pos, len-pos, fmt, ap); pos = strlen(buf); if (err >= 0) { snprintf(buf+pos, len-pos, ": %s\n", strerror(err)); } else { snprintf(buf+pos, len-pos, "\n"); } } static void complainx(const char *fmt, ...) { int rc; char buf[256]; va_list ap; va_start(ap, fmt); vscomplain(buf, sizeof(buf), fmt, ap, -1); va_end(ap); /* Write the message in one go so it's atomic */ rc = write(STDERR_FILENO, buf, strlen(buf)); /* suppress compiler warning about setting but not using rc */ (void)rc; } static void complain(const char *fmt, ...) { int rc; char buf[256]; va_list ap; int err = errno; va_start(ap, fmt); vscomplain(buf, sizeof(buf), fmt, ap, err); va_end(ap); /* Write the message in one go so it's atomic */ rc = write(STDERR_FILENO, buf, strlen(buf)); /* suppress compiler warning about setting but not using rc */ (void)rc; } //////////////////////////////////////////////////////////// static int doopen(const char *path, int flags, int mode) { int fd; fd = open(path, flags, mode); if (fd<0) { complain("%s", path); exit(1); } return fd; } static void doclose(const char *path, int fd) { if (close(fd)) { complain("%s: close", path); exit(1); } } static void docreate(const char *path) { int fd; fd = doopen(path, O_WRONLY|O_CREAT|O_TRUNC, 0664); doclose(path, fd); } static void doremove(const char *path) { static int noremove; if (noremove) { return; } if (remove(path) < 0) { if (errno == ENOSYS) { /* Complain (and try) only once. */ noremove = 1; } complain("%s: remove", path); } } static off_t getsize(const char *path) { struct stat buf; int fd; static int no_stat, no_fstat; if (!no_stat) { if (stat(path, &buf) == 0) { return buf.st_size; } if (errno != ENOSYS) { complain("%s: stat", path); exit(1); } /* Avoid further "Unknown syscall 81" messages */ no_stat = 1; } fd = doopen(path, O_RDONLY, 0); if (!no_fstat) { if (fstat(fd, &buf) == 0) { close(fd); return buf.st_size; } if (errno != ENOSYS) { complain("%s: stat", path); exit(1); } /* Avoid further "Unknown syscall 82" messages */ no_fstat = 1; } /* otherwise, lseek */ if (lseek(fd, 0, SEEK_END) >= 0) { buf.st_size = lseek(fd, 0, SEEK_CUR); if (buf.st_size >= 0) { return buf.st_size; } } complain("%s: getting file size with lseek", path); close(fd); exit(1); } static size_t doread(const char *path, int fd, void *buf, size_t len) { int result; result = read(fd, buf, len); if (result < 0) { complain("%s: read", path); exit(1); } return (size_t) result; } static void doexactread(const char *path, int fd, void *buf, size_t len) { size_t result; result = doread(path, fd, buf, len); if (result != len) { complainx("%s: read: short count", path); exit(1); } } static void dowrite(const char *path, int fd, const void *buf, size_t len) { int result; result = write(fd, buf, len); if (result < 0) { complain("%s: write", path); exit(1); } if ((size_t) result != len) { complainx("%s: write: short count", path); exit(1); } } static void dolseek(const char *name, int fd, off_t offset, int whence) { if (lseek(fd, offset, whence) < 0) { complain("%s: lseek", name); exit(1); } } #if 0 /* let's not require subdirs */ static void dochdir(const char *path) { if (chdir(path) < 0) { complain("%s: chdir", path); exit(1); } } static void domkdir(const char *path, int mode) { if (mkdir(path, mode) < 0) { complain("%s: mkdir", path); exit(1); } } #endif /* 0 */ static pid_t dofork(void) { pid_t pid; pid = fork(); if (pid < 0) { complain("fork"); /* but don't exit */ } return pid; } //////////////////////////////////////////////////////////// static int dowait(int guy, pid_t pid) { int status, result; result = waitpid(pid, &status, 0); if (result < 0) { complain("waitpid"); return -1; } if (WIFSIGNALED(status)) { complainx("proc %d: signal %d", guy, WTERMSIG(status)); return -1; } assert(WIFEXITED(status)); status = WEXITSTATUS(status); if (status) { complainx("proc %d: exit %d", guy, status); return -1; } return 0; } static void doforkall(const char *phasename, void (*func)(void)) { int i, bad = 0; pid_t pids[numprocs]; for (i=0; i 0 && dowait(i, pids[i])) { bad = 1; } } if (bad) { complainx("%s failed.", phasename); exit(1); } } static void seekmyplace(const char *name, int fd) { int keys_per, myfirst; off_t offset; keys_per = numkeys / numprocs; myfirst = me*keys_per; offset = myfirst * sizeof(int); dolseek(name, fd, offset, SEEK_SET); } static int getmykeys(void) { int keys_per, myfirst, mykeys; keys_per = numkeys / numprocs; myfirst = me*keys_per; mykeys = (me < numprocs-1) ? keys_per : numkeys - myfirst; return mykeys; } //////////////////////////////////////////////////////////// static unsigned long checksum_file(const char *path) { int fd; char buf[512]; size_t count, i; unsigned long sum = 0; fd = doopen(path, O_RDONLY, 0); while ((count = doread(path, fd, buf, sizeof(buf))) > 0) { for (i=0; i WORKNUM) { keys_to_do = WORKNUM; } for (i=0; i= 0); assert(value <= RANDOM_MAX); // do not allow the value to be zero or RANDOM_MAX while (value == 0 || value == RANDOM_MAX) { value = random(); } workspace[i] = value; } dowrite(PATH_KEYS, fd, workspace, keys_to_do*sizeof(int)); keys_done += keys_to_do; } doclose(PATH_KEYS, fd); } static void genkeys(void) { long seedspace[numprocs]; int i; /* Create the file. */ docreate(PATH_KEYS); /* Generate random seeds for each subprocess. */ srandom(randomseed); for (i=0; i WORKNUM) { keys_to_do = WORKNUM; } doexactread(PATH_KEYS, infd, workspace, keys_to_do * sizeof(int)); for (i=0; i= 0); assert(binnum < numprocs); dowrite("bin", outfds[binnum], &key, sizeof(key)); } keys_done += keys_to_do; } doclose(PATH_KEYS, infd); for (i=0; i (off_t) sizeof(workspace)) { complainx("proc %d: %s: bin too large", me, name); exit(1); } fd = doopen(name, O_RDWR, 0); doexactread(name, fd, workspace, binsize); sortints(workspace, binsize/sizeof(int)); dolseek(name, fd, 0, SEEK_SET); dowrite(name, fd, workspace, binsize); doclose(name, fd); } } static void mergebins(void) { int infds[numprocs], outfd; int values[numprocs], ready[numprocs]; const char *name, *outname; int i, result; int numready, place, val, worknum; outname = mergedname(me); outfd = doopen(outname, O_WRONLY|O_CREAT|O_TRUNC, 0664); for (i=0; i= 0); workspace[worknum++] = val; if (worknum >= WORKNUM) { assert(worknum == WORKNUM); dowrite(outname, outfd, workspace, worknum * sizeof(int)); worknum = 0; } ready[place] = 0; } dowrite(outname, outfd, workspace, worknum * sizeof(int)); doclose(outname, outfd); for (i=0; i WORKNUM) { keys_to_do = WORKNUM; } doexactread(name, fd, workspace, keys_to_do * sizeof(int)); for (i=0; i= RANDOM_MAX) { complain("%s: found too-large key", name); exit(1); } if (key < smallest) { smallest = key; } if (key > largest) { largest = key; } } keys_done += keys_to_do; } doclose(name, fd); name = validname(me); fd = doopen(name, O_WRONLY|O_CREAT|O_TRUNC, 0664); dowrite(name, fd, &smallest, sizeof(smallest)); dowrite(name, fd, &largest, sizeof(largest)); doclose(name, fd); } static void validate(void) { int smallest, largest, prev_largest; int i, fd; const char *name; doforkall("Validation", dovalidate); checksize_valid(); prev_largest = 1; for (i=0; i= RANDOM_MAX) { complainx("Validation: block %d: bad LARGEST", i); exit(1); } if (smallest > largest) { complainx("Validation: block %d: SMALLEST > LARGEST", i); exit(1); } if (smallest < prev_largest) { complain("Validation: block %d smallest key %d", i, smallest); complain("Validation: previous block largest key %d", prev_largest); complain("Validation failed"); exit(1); } } for (i=0; i 0 ? argv[0] : NULL); doargs(argc, argv); correctsize = (off_t) (numkeys*sizeof(int)); setdir(); genkeys(); sort(); validate(); complainx("Succeeded."); unsetdir(); return 0; }