/*
 * buffer-copy.c
 *   Copies data efficiently from its stdin to its stdout.
 *   Works by spawning two subprocesses, one for reading, and one for
 *   writing.  That way, both read and write operations can be run in
 *   paralell, rather than sequentially.
 *   At ltnb, we use this send entire disks over our network for backup
 *   purposes.  A 10 Gig disk can be backed up in 20 minutes that way
 *
 *   Usage: buffer-copy [-i inputbufferSize] [-o outputbufferSize] 
 *          [-s sleepDelay] [-a amount] [-t targetFile] [-v] <inputFile
 *
 *   -i inputSize    the size of the input buffer (that many bytes are read
 *                   at once from input file
 *   -o outputSize   the size of the output buffer (that many bytes are
 *                   written at once to the output file
 *   -s sleepDelay   delay, expressed in microseconds, during which the 2
 *                   threads wait to synchronize.  Mainly intended for
 *                   testing, it turned out that this only had a minor
 *                   impact on performance.
 *                   Default 100us.
 *   -t targetFile   where to write the data to.  For efficiency, this is
 *                   opened with the O_SYNC flag
 *   -a amount       number of input buffers copied.  Mainly intended for
 *                   performance testing (if you don't want to wait for
 *                   the whole 20 minutes to see if a tweak helped or not).
 *   -v              verbose.  Displays a running count of copied output
 *                   buffers.
  *
  *   All sizes can be expressed in bytes (b), kilobytes (k) or megabytes (m).
  *
  * Example:
  *   buffer-copy -i 2m -o 2m -v </dev/hda | rsh ltnb5 buffer-copy -i 2m \
  *               -o 2m -t /dev/hda
  *
  *  Attention: all filesystems on /dev/hda on the source machine should be
  *  mounted read-only.  Filesystem on /dev/hda on the target machine
  *  shouldn't be mounted at all.  Both /dev/hda's on the source and target
  *  machine should have the same geometry.
  *
  * alain@linux.lu
  */


#include <sys/types.h>
#include <stdlib.h>
#include <unistd.h>
#include <getopt.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/wait.h>

#define NBUF 10

#define DEBUG 0

char *progname;
char hostname[100];
int verbose=0;
int sleeptime = 100;
int ofd = 1;
struct sem { 
    volatile int readPtr;
    volatile int writePtr; 
    volatile int readAmount[NBUF];
};

void usage(void) __attribute__ ((noreturn));
void usage(void) {
    fprintf(stderr, "Usage: %s [-i inputbufferSize] [-o outputbufferSize] [-s sleepDelay] [-a amount] [-t targetFile] [-v] \n", progname);
    exit(1);
}


unsigned int size(char *arg) {
    unsigned int s;
    char *ptr;

    s = strtoul(arg, &ptr, 0);
    switch(*ptr) {
	case 'k': 
	    return s * 1024;
	case '\0':
	case 'b':
	    return s;
	case 'm':
	    return s * 1024 * 1024;
	default:
	    fprintf(stderr, "Bad quantitiy %s\n", arg);
	    exit(1);
    }
}


void writer(struct sem *sem, char *buffer, int obufsize, int blocksize) {
    unsigned int amount;
    int len;
    int offset;
    int sel;
    int wsize;

    while(sem->writePtr >= sem->readPtr) {
	usleep(sleeptime);
    }
    if(verbose /*&& sem->writePtr % 10 == 0*/)
    	fprintf (stderr, "\r%06d", sem->writePtr);
    fflush(stderr);
#if DEBUG
    fprintf(stderr, "Writer: w=%d r=%d\n", sem->writePtr, sem->readPtr);
#endif
    sel = sem->writePtr % NBUF;

    amount = sem->readAmount[sel];

#if DEBUG
    fprintf(stderr, "Amount = %d\n", amount);
#endif

    if (! amount) {
	/* reader signalled us EOF */
	fprintf(stderr, "\n");
	exit(0);
    }

    offset = sel * blocksize;
    while(amount) {
	wsize = amount;
	if (wsize > obufsize)
	    wsize = obufsize;
	len = write(ofd, buffer + offset, wsize);
	if (len < 0) {
	    perror("write");
	    sem->writePtr = 0;
	    exit(1);
	}
	amount -= len;
	offset += len;
    }
    sem->writePtr++;   
}


void reader(struct sem *sem, char *buffer, int ibufsize, int blocksize) {
    int offset;
    int sel;
    char *lbuffer;
    int len;
    int rsize;    

    while(sem->readPtr >= sem->writePtr + NBUF && sem->writePtr) {
	usleep(sleeptime);
    }

    if(!sem->writePtr) {
	/* writer signalled us EOF */
	fprintf(stderr, "writer signalled EOF\n");
	exit(0);
    }

    sel = sem->readPtr % NBUF;
    lbuffer = buffer + blocksize * sel;
   
    offset = 0;
    while(offset < blocksize) {
	rsize = blocksize - offset;
	if (rsize > ibufsize)
	    rsize = ibufsize;
	len = read(0, lbuffer + offset, rsize);
	if (len == 0)
	    break;
	if (len < 0) {
	    perror("read");
	    sem->readAmount[sel] = 0;
	    sem->readPtr ++;
	    exit(1);
	}
	offset += len;
    }

#if DEBUG
    fprintf(stderr, "Read %04x bytes, rptr=%d wptr=%d %02x %02x\n",
	    offset, sem->readPtr, sem->writePtr,
	    (unsigned char) lbuffer[0], (unsigned char) lbuffer[1]);
#endif


    sem->readAmount[sel] = offset;
    sem->readPtr ++;

#if DEBUG
    fprintf(stderr, "Reader: offset=%d ptr=%d\n", offset, sem->readPtr);
#endif

    if(offset == 0)
	exit(0);
}

int main(int argc, char **argv)
{
    int rpid, wpid;
    int shmid = -1;
    int zero = open("/dev/zero", O_RDWR);
    int ibufsize = 0;
    int obufsize = 0;
    int blocksize = 0;
    char *ptr;
    struct sem  *sem;
    char *buffer;
    int status;
    int amount=0;

    progname = argv[0];


    if (zero < 0) {
	perror("open /dev/zero");
	exit(1);
    }

    gethostname(hostname, 99);

    while(1) {
	switch(getopt(argc, argv, "vi:o:a:s:t:")) {
	    case 'v':
		verbose = 1;
		break;
	    case 'a':
		amount = size(optarg);
		break;
	    case 'i':
		ibufsize = size(optarg);
		break;
	    case 'o':
		obufsize = size(optarg);
		break;
	    case 's':
		sleeptime = size(optarg);
		break;
	    case 't':
		ofd = open(optarg, O_WRONLY | O_SYNC);
		if(ofd < 0 ) {
			perror("open");
		}
		break;
	    case '?':
		fprintf(stderr, "Unknown flag\n");
		usage();
	    case -1:
		goto end_while;
	}
    }
 end_while:

    if (ibufsize == 0) {
	fprintf(stderr, "Missing input buffer size\n");
	exit(1);
    }
    if (obufsize == 0) {
	fprintf(stderr, "Missing output buffer size\n");
	exit(1);
    }

    if (ibufsize > obufsize) {
	blocksize = ibufsize;
    } else {
	blocksize = obufsize;
    }

    fprintf(stderr,
	    "ibuf=%04x obuf=%04x blk=%04x\n", ibufsize, obufsize, blocksize);


    shmid = shmget(0, blocksize * NBUF + 4096, IPC_CREAT | 0600);
    if (shmid < 0) {
	perror("shmget");
	exit(1);
    }

    ptr = shmat(shmid, 0, 0);
    

/*
    ptr = mmap(0,  4096 + 2 * blocksize, PROT_READ | PROT_WRITE, 
	       MAP_SHARED | MAP_ANON, zero, 0);
*/

    if ((int) ptr == -1 || ptr == 0) {
	perror("mmap");
	exit(1);
    }


    buffer = ptr + 4096;
    sem = (struct sem *) ptr;


    sem->readPtr = 1;
    sem->writePtr = 1;

    switch( (rpid=fork()) ) {
	case 0: /* the child */
	    if (amount) {
		int i;
		for(i=0; i < amount ; i++) {
		    reader(sem, buffer, ibufsize, blocksize);
		}
		sem->readAmount[sem->readPtr % NBUF] = 0;
		sem->readPtr++;
	    } else {
		while(1) {
		    reader(sem, buffer, ibufsize, blocksize);
		}
	    }
	    exit(0);
	case -1: /* an error */
	    perror("fork");
	    exit(1);
	default: /* the father */	    
    }



    switch( (wpid=fork()) ) {
	case 0: /* the child */
	    while(1) {
		writer(sem, buffer, obufsize, blocksize);
	    }
	    exit(0);
	case -1: /* an error */
	    perror("fork");
	    exit(1);
	default: /* the father */
    }

    waitpid(rpid, &status, 0);
    fprintf(stderr, "Reader %s exited %d\n", hostname, WEXITSTATUS(status));
    fprintf(stderr, "%d %d %d\n", sem->readPtr, sem->writePtr,
	    sem->readAmount[sem->readPtr % NBUF]);
    waitpid(wpid, &status, 0);
    fprintf(stderr, "Writer %s exited %d\n", hostname, WEXITSTATUS(status)); 
    if(shmctl(shmid, IPC_RMID, 0)) {
	perror("remove shared segment");
	exit(1);
    }
    exit(0);
}
