这是indexloc提供的服务,不要输入任何密码
Skip to content

prof123/mapreduce

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

18 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

diff --git a/MAP.h b/MAP.h
index 840eb78..bd16dee 100644
--- a/MAP.h
+++ b/MAP.h
@@ -2,23 +2,17 @@
  * Header file for the MAP tasks??
  */
 
-typedef void* map_key_t ;
-typedef void* map_val_t ;
+typedef char* map_key_t ;
+typedef char* map_val_t ;
 
-/**
- * The key, value pairs handled in MAP. MAP gets this from the user defined map functions, and deals with these types throughout - hashing etc
- */
 struct MAP_keyval {
 	map_key_t key ;
 	map_val_t value ;
 };
 
-/**
- * This is what the user-defined map function spits out - a dynamic array of key-value pairs, and a count of the number of pairs */
-
 struct MAP_out {
 	int count ;  	//number of key-value pairs in the dynamic array
-	DYNARRAY  data ; //actual mapped list  //K: add another *, before data.
+	DYNARRAY data ; //actual mapped list
 };
 	
 
diff --git a/MAP.h~ b/MAP.h~
index beba89d..15a0fc3 100644
--- a/MAP.h~
+++ b/MAP.h~
@@ -2,19 +2,15 @@
  * Header file for the MAP tasks??
  */
 
-typedef void* map_key_t ;
-typedef void* map_val_t ;
+typedef char* map_key_t ;
+typedef char* map_val_t ;
 
-struct MAP_keyval {
+struct map_keyval {
 	map_key_t key ;
 	map_val_t value ;
 };
 
-struct MAP_out {
-	int count ;  	//number of key-value pairs in the dynamic array
-	DYNARRAY  data ; //actual mapped list  //K: add another *, before data.
-};
-	
+
 
 
 
diff --git a/da_test.c~ b/da_test.c~
index 4870406..f5ede75 100644
--- a/da_test.c~
+++ b/da_test.c~
@@ -3,7 +3,7 @@
 #include <stdio.h>
 
 int main() {
-	DYNARRAY a = new_array(sizeof(ELEMENT));
+	DYNARRAY a = new_array(sizeof(char));
 	int c = insert_into(a,32);
 	printf("%d\n",a->DATA[0]);
 }
diff --git a/dyn_array.c b/dyn_array.c
index 0eef029..14951ee 100644
--- a/dyn_array.c
+++ b/dyn_array.c
@@ -1,7 +1,4 @@
 #include "dyn_array.h"
-/**
- * This is the dynamic array implementation used
- */
 
 DYNARRAY new_array (size_t elem_size) 
 {
@@ -13,7 +10,6 @@ DYNARRAY new_array (size_t elem_size)
 	return array ;
 }
 
-
 int insert_into (DYNARRAY array , ELEMENT e)  //size of the element
 {
 	DYNARRAY v ;
@@ -27,7 +23,7 @@ int insert_into (DYNARRAY array , ELEMENT e)  //size of the element
 	max = v->max ;
 	if (count >= max) {  /* reached max limit allocated */
 		v->max = ((v->max)*2) + 1 ;
-		new_size = (v->max)*(v->elem_size) ; //
+		new_size = (v->max)*(v->elem_size) ;
 		v->DATA = (ELEMENT*) realloc(v->DATA , new_size);
 	}
 	v->DATA[count] = e ;
@@ -37,21 +33,6 @@ int insert_into (DYNARRAY array , ELEMENT e)  //size of the element
 }
 	
 
-/**
- * TODO:Evaluate this
- */
-void* get_i(DYNARRAY array, int position)
-{
-	void* v = (void*) array ;
-//Do not know the element type etc. Hmm..
-//
-	size_t size = array->elem_size ;
-	unsigned int offset = size*position ;
-	void* r = v+offset ;
-	return r ;
-/*It's critical that the offset is correct :P. Should be able to test this using dyn_test.So another TODO. We are returning a pointer to the caller (MAP actually) and assuming that it knows the size of each element so it only reads those many bytes and there's no overlap etc etc.
- */
 
-}
-}
+
 
diff --git a/dyn_array.c~ b/dyn_array.c~
index 0eef029..14951ee 100644
--- a/dyn_array.c~
+++ b/dyn_array.c~
@@ -1,7 +1,4 @@
 #include "dyn_array.h"
-/**
- * This is the dynamic array implementation used
- */
 
 DYNARRAY new_array (size_t elem_size) 
 {
@@ -13,7 +10,6 @@ DYNARRAY new_array (size_t elem_size)
 	return array ;
 }
 
-
 int insert_into (DYNARRAY array , ELEMENT e)  //size of the element
 {
 	DYNARRAY v ;
@@ -27,7 +23,7 @@ int insert_into (DYNARRAY array , ELEMENT e)  //size of the element
 	max = v->max ;
 	if (count >= max) {  /* reached max limit allocated */
 		v->max = ((v->max)*2) + 1 ;
-		new_size = (v->max)*(v->elem_size) ; //
+		new_size = (v->max)*(v->elem_size) ;
 		v->DATA = (ELEMENT*) realloc(v->DATA , new_size);
 	}
 	v->DATA[count] = e ;
@@ -37,21 +33,6 @@ int insert_into (DYNARRAY array , ELEMENT e)  //size of the element
 }
 	
 
-/**
- * TODO:Evaluate this
- */
-void* get_i(DYNARRAY array, int position)
-{
-	void* v = (void*) array ;
-//Do not know the element type etc. Hmm..
-//
-	size_t size = array->elem_size ;
-	unsigned int offset = size*position ;
-	void* r = v+offset ;
-	return r ;
-/*It's critical that the offset is correct :P. Should be able to test this using dyn_test.So another TODO. We are returning a pointer to the caller (MAP actually) and assuming that it knows the size of each element so it only reads those many bytes and there's no overlap etc etc.
- */
 
-}
-}
+
 
diff --git a/dyn_array.h b/dyn_array.h
index 5ed665c..859f892 100644
--- a/dyn_array.h
+++ b/dyn_array.h
@@ -6,9 +6,7 @@
 /**
  * Dynamic array implementation
  */
-/**
- * READ THE GODDAMN CODE BEFORE GETTING STUCK HERE FOR 3 WEEKS! >.<
- */
+
 #ifndef ELEMENT
 typedef char ELEMENT ;
 #endif
@@ -16,7 +14,7 @@ typedef char ELEMENT ;
 struct dynarry_struct {
 	int count ;
 	int max ;
-	size_t elem_size ;  //SEE THIS? YES SIZE
+	size_t elem_size ;
 	ELEMENT* DATA ;
 	
 }; 
@@ -28,7 +26,6 @@ DYNARRAY new_array (size_t elem_size) ;
 
 int insert_into (DYNARRAY array , ELEMENT e) ;
 
-/**THis is for the case if we dont want statically determined types. Array access cant be so straight forward. Also notice the return type, it is a void*, a pointer to some sequence of bytes.*/
-void get_i(DYNARRAY array, int position) ;
+
 
 
diff --git a/dyn_array.h~ b/dyn_array.h~
index 859f892..4156133 100644
--- a/dyn_array.h~
+++ b/dyn_array.h~
@@ -8,7 +8,7 @@
  */
 
 #ifndef ELEMENT
-typedef char ELEMENT ;
+extern ELEMENT ;
 #endif
 
 struct dynarry_struct {
diff --git a/mapReduce.c b/mapReduce.c
index f285389..a8871e1 100755
--- a/mapReduce.c
+++ b/mapReduce.c
@@ -12,29 +12,65 @@
 #include "MAP.h"
 #include "dyn_array.h"
 
-int main(int argc, char** argv)
+
+void mapReduce(int files, char **fnames, int nprocesses, int rprocesses, void *mapfunc, void *reducefunc)
 {
-	/**
-	 * TODO:Spawn all the mpi stuff here..
-	 * TODO:fill the options for MAP.
-	 */
-	if( is_mapper(myrank)) {
-
-		initialize_HASH_table() ;
-		MAP(myrank,OPTIONS,udf) ;
-	}
+	int numtasks, rank, source=0, dest, tag=1, i, amount;
+	char buf[LINE_MAX], **bucketfnames, bucketfname[LINE_MAX];
+	MPI_File file, *bucketfiles;
+	MPI_Offset size, startpoint;
+	
+	bzero(buf, LINE_MAX);
 
-	if(is_reducer(myrank)) {
+	MPI_Status stat;
 
+	MPI_Init(NULL,NULL);
+	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+	MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
+	
+	if(MPI_File_open(MPI_COMM_WORLD, fnames[0], MPI_MODE_RDONLY | MPI_MODE_UNIQUE_OPEN, MPI_INFO_NULL, &file) < 0) {
+		perror("File open");
+		exit(0);
+	}
+	
+	bucketfnames = malloc(rprocesses * sizeof(char *));
+	//bucketfiles = malloc(rprocesses * sizeof(MPI_File));
+	for(i = 0; i < rprocesses; i++) {
+		sprintf(bucketfname, "file%d", i);
+		bucketfnames[i] = malloc(strlen(bucketfname)+1);
+		sprintf(bucketfnames[i], "%s", bucketfname);
 	}
-}
 
+  	MPI_File_get_size(file, &size);
+  	startpoint = size / numtasks * rank;
+  	
+  	if(rank != numtasks - 1)
+  		amount = size / numtasks * (rank + 1) - startpoint;
+  	else
+  		amount = size - startpoint;
+  	
+    MPI_File_read_at(file, startpoint, buf, amount, MPI_CHAR, &stat);
+  	
+  	MAP (buf, mapfunc, reducefunc, rprocesses, bucketfnames, stat, rank, numtasks);
+  	
+ /* 	printf("rank = %d, data = ", rank);
+  	for(i = 0; i < amount; i++)
+  		printf(" %c",buf[i]);
+  	printf("\n");*/
+  	//MPI_File filee;
+  	printf("The End\n");
+	MPI_Barrier( MPI_COMM_WORLD );
+	printf("Closing\n");
+	
+	MPI_File_close(&file);
+	MPI_Finalize();
+}
 
 void MAP (int rank, struct options OPTIONS, struct udef_functions udf)
 {
 	void* mapfun = udf.mapfunc ;
 	
-	struct MAP_out map_output ; 	//Store the output of udefmap
+	struct MAP_out map_output ;
      	map_output = (*mapfunc)(rank) ;
 	/* User does the input split based on rank. */
 	/* And does dynamic array stuff.*/
@@ -44,19 +80,14 @@ void MAP (int rank, struct options OPTIONS, struct udef_functions udf)
 	int i = 0 ;
 	struct MAP_keyval keyval ;
 
-/*Careful!We now assume that the type of the keyval etc is known. 
- * If we go the size way then modify dynamic array implementation slightly
- */
 	while (i < output.count) {
-
-		keyval = output[i] ;
-
+		keyval = out.put[i] ;
 		HASH (keyval) ;
-
 		i++ ;
 	}
   	
-	populate_tag_tab() ;
+	MPI_Barrier( MPI_COMM_WORLD );
+
 }
 
 
@@ -69,13 +100,9 @@ void initialize_HASH_table()
 }
 
 /**
- * Hash given keyval pair into the Hash_Table. Assumes we have the keycompare function
- * keycmp.Should the user gives that?
+ * Hash given keyval pair into the Hash_Table
  */
 
-/**
- * TODO: CHECK this function for pointer safety and logic.
- */
 int HASH (struct MAP_keyval keyval) 
 {
 	map_key_t key ;
@@ -110,9 +137,6 @@ struct tag_entry[TAG_MAX] tag_table ;
 /**
  * goes through hash table and populates tag table..
  */
-/*
- * TODO: CHECK for logical loopholes please
- */
 int populate_tag_tab() 
 {
 	HTABLE curr_key = Hash_Table ;
@@ -132,20 +156,371 @@ int populate_tag_tab()
 int is_mapper(int rank) {
 	return 1 ;
 }
-
 /* TODO */
 int is_reducer(int rank) {
 	return 1;
 }
 
-/* TODO */
-int is_merger(int rank) {
-	return 1;
+void reduce(MPI_File file, MPI_Status status, void (*reducefunc)(char*, char*)) {
+	int i;
+	struct reducer_t* first, *temp;
+	intoReduceType(file, &first, status);
+
+
+	printf("All the keys in the end:\n");
+	
+	temp = first;
+	while(temp != NULL) {
+			
+		printf("stored key:%s, size:%d\n", temp->key, temp->size);
+		for(i = 0; i < temp->size; i++)
+			printf("value[%d]:%s\n", i, temp->vals[i]);
+				
+		temp = temp->next;
+	}
+
+	/*BEGIN: prateek
+ 	*Now onto the actual reduce. We are finished with grouping by key now***/
+	temp=first; //first is the first 'bucket'
+	while(temp!=NULL)  {
+		int i=0;
+ 		char* reduced_val = temp->vals[0];  //the first value. ?
+
+		for(i=0; i<(temp->size-1); i++) {
+			//userdefReduce(reduced_val , temp->vals[i+1]);
+			(*reducefunc)(reduced_val, temp->vals[i+1]);
+		}
+
+		printf("KEY:%s , VALUE:%s\n",temp->key,reduced_val);
+
+ 		temp=temp->next;
+ 	}
+
+
 }
 
-/**
- * Not needed presently but a simple hash function is always useful!
+/****REDUCED****/
+	
+
+/*user defined Reduce function*** - simple addition*/
+/*void userdefReduce(char* inout, char* in)
+{
+	int i=atoi(in);
+	int j=atoi(inout);
+	sprintf(inout, "%d", (i+j));
+}
+*/
+/* itoa:  convert n to characters in s. picked from K&R. */
+/*void itoa(int n, char s[])
+{ 
+    int i, sign;
+
+    if ((sign = n) < 0)  /* record sign */
+ /*       n = -n;          /* make n positive */
+ /*   i = 0;
+    do {       /* generate digits in reverse order */
+  /*      s[i++] = n % 10 + '0';   /* get next digit */
+   /* } while ((n /= 10) > 0);     /* delete it */
+/*    if (sign < 0)
+        s[i++] = '-';
+    s[i] = '\0';
+    reverse(s);
+} 
+
+/* reverse:  reverse string s in place.Again K&R. */
+/*void reverse(char s[])
+{
+    int c, i, j;
+
+    for (i = 0, j = strlen(s)-1; i<j; i++, j--) {
+        c = s[i];
+        s[i] = s[j];
+        s[j] = c;
+    }
+}*/
+/*END: prateek */
+
+
+int intoReduceType(MPI_File file, struct reducer_t** firstt, MPI_Status status) {
+	char buf[LINE_MAX], *bufptr, keybuf[100], sizebuf[100], **vals, valsbuf[100][100], valbuf[100];
+	int ret = 0, r, atnow = 0, valsize, values, readbytes, i, a;
+	MPI_Offset startpoint = 0, fsize;
+	struct reducer_t *temp = NULL, *first = NULL, *next;
+	
+	bzero(buf, LINE_MAX);
+	//MPI_File_set_view( file, 0, MPI_CHAR, MPI_CHAR, "native", MPI_INFO_NULL );
+	//MPI_File_read(file, buf, LINE_MAX, MPI_CHAR, &status);
+	
+	MPI_File_get_size(file, &fsize);
+	MPI_File_read_at(file, startpoint, buf, fsize, MPI_CHAR, &status);
+	
+	printf("printing buffer, size = %d: ", fsize);
+  	for(r = 0; r < fsize; r++) {
+  		if(buf[r] == '\0')
+  			printf("_");
+  		else
+  			printf("%c", buf[r]);
+  	}
+  	printf("buffer printed\n");
+	//exit(0);
+	if(strlen(buf) == 0)
+		return 0;
+	
+	bufptr = buf;
+	while(1) {
+		if((atnow+1) >= fsize)
+			break;
+		
+		//bufptr = &buf[atnow];
+		
+		bzero(keybuf, 100);
+		memcpy(keybuf, bufptr, strlen(bufptr));
+		atnow += strlen(keybuf) +1;
+		bufptr = &buf[atnow];
+		
+		bzero(sizebuf, 100);
+		memcpy(sizebuf, bufptr, strlen(bufptr));
+		atnow += strlen(sizebuf) +1;
+		bufptr = &buf[atnow];
+		
+		valsize = atoi(sizebuf);
+		values = 0;
+		readbytes = 0;
+		while(1) {
+			bzero(valbuf, 100);
+			memcpy(valbuf, bufptr, strlen(bufptr));
+			values++;
+			strcpy(valsbuf[values-1], valbuf);
+			readbytes += strlen(valbuf)+1;
+			
+			atnow += strlen(valbuf)+1;
+			bufptr = &buf[atnow];
+			
+			if(readbytes == valsize)
+				break;
+		}/*
+		printf("key:%s, size:%d\n", keybuf, values);
+		for(i = 0; i < values; i++)
+			printf("value[%d]:%s\n", i, valsbuf[i]);
+			*/
+	//	vals = malloc(values * sizeof(char *));
+		//for(i = 0; i < values; i++) {
+		//	vals[i] = malloc(strlen(valsbuf[i]) * sizeof(char));
+		//	strcpy(vals[i], valsbuf[i]);	
+		//}
+		
+		temp = first;
+		
+					// This is for the first key.
+		if(temp == NULL) {
+			temp = malloc(sizeof(struct reducer_t));
+			temp->next = NULL;
+			temp->size = values;
+			temp->key = malloc((strlen(keybuf)+1) * sizeof(char));
+			strcpy(temp->key, keybuf);
+			
+			temp->vals = malloc(values * sizeof(char *));
+			for(i = 0; i < values; i++) {
+				temp->vals[i] = malloc((strlen(valsbuf[i])+1) * sizeof(char));
+				strcpy(temp->vals[i], valsbuf[i]);	
+			}
+			
+			first = temp;
+			/*
+			printf("first stored key:%s, size:%d\n", temp->key, temp->size);
+			for(i = 0; i < temp->size; i++)
+				printf("value[%d]:%s\n", i, temp->vals[i]);
+			
+			printf("position now:%d\n",atnow);*/
+			
+			continue;
+		}
+		
+		
+		//printf("");
+		
+		
+		
+		while(1) {
+			if(strcmp(temp->key, keybuf) == 0) {
+				char **copy = temp->vals;
+				temp->vals = malloc((temp->size+values) * sizeof(char *));
+				for(i = 0; i < temp->size; i++) {
+					temp->vals[i] = malloc((strlen(copy[i])+1) * sizeof(char));
+					strcpy(temp->vals[i], copy[i]);
+				}
+				for(a = 0;i < (temp->size+values); a++, i++) {
+					temp->vals[i] = malloc((strlen(valsbuf[a])+1) * sizeof(char));
+					strcpy(temp->vals[i], valsbuf[a]);
+				}
+				for(i = 0; i < temp->size; i++)
+					free(copy[i]);
+				free(copy);
+				
+				temp->size += values;
+				
+				/*
+				printf("appended key:%s, size:%d\n", temp->key, temp->size);
+				for(i = 0; i < temp->size; i++)
+					printf("value[%d]:%s\n", i, temp->vals[i]);
+					
+				printf("position now:%d\n",atnow);*/
+				break;
+			}
+			else if(temp->next == NULL) {
+				next = malloc(sizeof(struct reducer_t));
+				next->next = NULL;
+				next->size = values;
+				next->key = malloc((strlen(keybuf)+1) * sizeof(char));
+				strcpy(next->key, keybuf);
+				
+				next->vals = malloc(values * sizeof(char *));
+				for(i = 0; i < values; i++) {
+					next->vals[i] = malloc((strlen(valsbuf[i])+1) * sizeof(char));
+					strcpy(next->vals[i], valsbuf[i]);	
+				}
+				
+				temp->next = next;
+				/*
+				printf("new stored key:%s, size:%d\n", temp->next->key, temp->next->size);
+				for(i = 0; i < temp->next->size; i++)
+					printf("value[%d]:%s\n", i, temp->next->vals[i]);
+					
+				printf("position now:%d\n",atnow);*/
+				break;
+			}
+			temp = temp->next;
+		}
+		
+  	}
+  	
+  	
+	
+	
+	*firstt = first;
+	
+	
+}
+
+
+
+  /*Now that all mapping is done, wait for other maps to finish, group-by-key an   *d call reduce
+   */
+  //Wait_for_all_Mappers();
+//  	MPI_Barrier(MPI_COMM_GLOBAL);
+  
+  /*
+  	HTABLE* temp=htable;
+  	struct kv_list* kv;
+  	for(r=0;r<R;r++) {
+    	temp=htable[r];
+    	while(temp!=NULL) {
+    		kv=temp->vals;
+    		REDUCE(kv,r);
+    	}
+	}*/
+//}
+
+//InterKV udefMAP(char* input) {
+  /*User defined MAP function*/
+//}
+
+
+/*Reduce: specify the intermediate key for which the reduce is to be performed*/
+/*
+struct reducer_t REDUCER(struct kv_list* to_reduce,int r)
+{
+  struct reducer_t reduce=_malloc();
+  reduce->next=NULL;
+  MPI_Reduce(/*something goes here*//*);
+  MPI_Reduce((void*)to_reduce ,(void*)reduce->reduced_val ,to_reduce->count,
+            MPI_Datatype datatype, MPI_Op op,r,COMM_GLOBAL)
+    if(rank==r) {
+      /*print the reduced values*//*
+      _PRINT(reduced->val);
+    }
+}
+*/
+/*returns a pointer to the list containing all the (key,value) pairs for a uniq  *key. effectively, traverse the hash table, by keeping some 'global' state in t
+ *he MAP function
  */
+/*struct kv_list* get_next_unique_ikey(HTABLE htable[])
+{ 
+  int r=0;
+  HTABLE* temp=htable;
+  struct kv_list* kv;
+  for(r=0;r<R;r++) {
+    temp=htable[r];
+    while(temp!=NULL) {
+    kv=temp->vals;
+    REDUCE(kv,r);
+    }
+}*/
+
+
+
+/**
+ */ 
+int HASH(KV_t *kv, HTABLE htable[], int R)
+{
+  	/*firstly, determine where key will hash to*/
+  	int bucket_num = hashfun(kv->key, R);
+
+  	char *to_add = kv->value;
+  	/*to_add->next assigned to previous head of list*/
+
+  	/*now figure out the 'top-level' bucket to add it to*/
+ 	struct HT_bucket *buck = htable[bucket_num];
+  	if(buck==NULL) {
+    	/*new->values = malloc(sizeof(char) * LINE_MAX);
+   		new->key = malloc(sizeof(char) * (strlen(kv->key)+1));
+     	*create HT_bucket node, and add the 
+     	*key-val pair to the bucket */
+   		struct HT_bucket* new = (struct HT_bucket*) malloc(sizeof(struct HT_bucket) );
+   		new->values = malloc(sizeof(char) * LINE_MAX);
+   		new->key = malloc(sizeof(char) * (strlen(kv->key)+1));
+    	strcpy(new->key, kv->key);
+    	sprintf(new->values, "%s", to_add);
+    	new->size = strlen(to_add) +1;
+    	
+    	new->next_key=NULL;
+    	htable[bucket_num] = new;
+    	
+    	return 1;
+  	}
+
+	struct HT_bucket *temp=buck;
+	while(1) {
+		
+						// Found right key. Adding entry.
+		if(strcmp(temp->key, kv->key) == 0) {
+			
+			sprintf(&(temp->values[temp->size]), "%s", to_add);
+			temp->size += strlen(to_add)+1;
+			break;
+		}
+						// To next bucket
+		else if(temp->next_key != NULL)
+			temp = temp->next_key;
+			
+		else {			// Adding a new bucket
+			struct HT_bucket* new=(struct HT_bucket*) malloc(sizeof(struct HT_bucket) );
+			new->values = malloc(sizeof(char) * LINE_MAX);
+   			new->key = malloc(sizeof(char) * (strlen(kv->key)+1));
+   			strcpy(new->key, kv->key);
+    		sprintf(new->values, "%s", to_add);
+   			new->size = strlen(to_add) +1;
+   			
+			new->next_key=NULL;
+			temp->next_key=new;
+			break;
+		}
+	}
+	
+	return 1;
+
+} 
+
 int hashfun(char* str,int R) 
 {
 	int h=0;
diff --git a/mapReduce.c~ b/mapReduce.c~
index 6ad5ba4..3bdca5c 100755
--- a/mapReduce.c~
+++ b/mapReduce.c~
@@ -12,22 +12,65 @@
 #include "MAP.h"
 #include "dyn_array.h"
 
-int main(int argc, char** argv)
+
+void mapReduce(int files, char **fnames, int nprocesses, int rprocesses, void *mapfunc, void *reducefunc)
 {
-	/**
-	 * TODO:Spawn all the mpi stuff here..
-	 * TODO:fill the options for MAP.
-	 */
-	initialize_HASH_table() ;
-	MAP(myrank,OPTIONS,udf) ;
-}
+	int numtasks, rank, source=0, dest, tag=1, i, amount;
+	char buf[LINE_MAX], **bucketfnames, bucketfname[LINE_MAX];
+	MPI_File file, *bucketfiles;
+	MPI_Offset size, startpoint;
+	
+	bzero(buf, LINE_MAX);
 
+	MPI_Status stat;
+
+	MPI_Init(NULL,NULL);
+	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+	MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
+	
+	if(MPI_File_open(MPI_COMM_WORLD, fnames[0], MPI_MODE_RDONLY | MPI_MODE_UNIQUE_OPEN, MPI_INFO_NULL, &file) < 0) {
+		perror("File open");
+		exit(0);
+	}
+	
+	bucketfnames = malloc(rprocesses * sizeof(char *));
+	//bucketfiles = malloc(rprocesses * sizeof(MPI_File));
+	for(i = 0; i < rprocesses; i++) {
+		sprintf(bucketfname, "file%d", i);
+		bucketfnames[i] = malloc(strlen(bucketfname)+1);
+		sprintf(bucketfnames[i], "%s", bucketfname);
+	}
+
+  	MPI_File_get_size(file, &size);
+  	startpoint = size / numtasks * rank;
+  	
+  	if(rank != numtasks - 1)
+  		amount = size / numtasks * (rank + 1) - startpoint;
+  	else
+  		amount = size - startpoint;
+  	
+    MPI_File_read_at(file, startpoint, buf, amount, MPI_CHAR, &stat);
+  	
+  	MAP (buf, mapfunc, reducefunc, rprocesses, bucketfnames, stat, rank, numtasks);
+  	
+ /* 	printf("rank = %d, data = ", rank);
+  	for(i = 0; i < amount; i++)
+  		printf(" %c",buf[i]);
+  	printf("\n");*/
+  	//MPI_File filee;
+  	printf("The End\n");
+	MPI_Barrier( MPI_COMM_WORLD );
+	printf("Closing\n");
+	
+	MPI_File_close(&file);
+	MPI_Finalize();
+}
 
 void MAP (int rank, struct options OPTIONS, struct udef_functions udf)
 {
 	void* mapfun = udf.mapfunc ;
 	
-	struct MAP_out map_output ; 	//Store the output of udefmap
+	struct MAP_out map_output ;
      	map_output = (*mapfunc)(rank) ;
 	/* User does the input split based on rank. */
 	/* And does dynamic array stuff.*/
@@ -37,19 +80,14 @@ void MAP (int rank, struct options OPTIONS, struct udef_functions udf)
 	int i = 0 ;
 	struct MAP_keyval keyval ;
 
-/*Careful!We now assume that the type of the keyval etc is known. 
- * If we go the size way then modify dynamic array implementation slightly
- */
 	while (i < output.count) {
-
-		keyval = output[i] ;
-
+		keyval = out.put[i] ;
 		HASH (keyval) ;
-
 		i++ ;
 	}
   	
-	populate_tag_tab() ;
+	MPI_Barrier( MPI_COMM_WORLD );
+
 }
 
 
@@ -62,13 +100,9 @@ void initialize_HASH_table()
 }
 
 /**
- * Hash given keyval pair into the Hash_Table. Assumes we have the keycompare function
- * keycmp.Should the user gives that?
+ * Hash given keyval pair into the Hash_Table
  */
 
-/**
- * TODO: CHECK this function for pointer safety and logic.
- */
 int HASH (struct MAP_keyval keyval) 
 {
 	map_key_t key ;
@@ -96,45 +130,6 @@ int HASH (struct MAP_keyval keyval)
 
 	return 1 ;
 }
-#define TAG_MAX	10000 
-//tag_max is max number of tags supported == INT_MAX
-struct tag_entry[TAG_MAX] tag_table ;
-
-/**
- * goes through hash table and populates tag table..
- */
-/*
- * TODO: CHECK for logical loopholes please
- */
-int populate_tag_tab() 
-{
-	HTABLE curr_key = Hash_Table ;
-	map_key_t key ;
-	struct tag_entry t;
-	int i = 0 ;
-	while (curr_key!=NULL) {
-		key = curr_key->key ;
-		tag_table[i].tag = i ;
-		tag_table[i].key = key ;
-		return 1 ;
-	}
-	return 0 ;
-}
-
-/* TODO */
-int is_mapper(int rank) {
-	return 1 ;
-}
-
-/* TODO */
-int is_reducer(int rank) {
-	return 1;
-}
-
-/* TODO */
-int is_merger(int rank) {
-	return 1;
-}
 
 void reduce(MPI_File file, MPI_Status status, void (*reducefunc)(char*, char*)) {
 	int i;
@@ -276,11 +271,6 @@ int intoReduceType(MPI_File file, struct reducer_t** firstt, MPI_Status status)
 		}/*
 		printf("key:%s, size:%d\n", keybuf, values);
 		for(i = 0; i < values; i++)
-			if(readbytes == valsize)
-				break;
-		}/*
-		printf("key:%s, size:%d\n", keybuf, values);
-		for(i = 0; i < values; i++)
 			printf("value[%d]:%s\n", i, valsbuf[i]);
 			*/
 	//	vals = malloc(values * sizeof(char *));
@@ -416,6 +406,11 @@ struct reducer_t REDUCER(struct kv_list* to_reduce,int r)
   MPI_Reduce((void*)to_reduce ,(void*)reduce->reduced_val ,to_reduce->count,
             MPI_Datatype datatype, MPI_Op op,r,COMM_GLOBAL)
     if(rank==r) {
+      /*print the reduced values*//*
+      _PRINT(reduced->val);
+    }
+}
+*/
 /*returns a pointer to the list containing all the (key,value) pairs for a uniq  *key. effectively, traverse the hash table, by keeping some 'global' state in t
  *he MAP function
  */
diff --git a/mapReduce.h b/mapReduce.h
index e071775..5fd0186 100755
--- a/mapReduce.h
+++ b/mapReduce.h
@@ -13,13 +13,23 @@
 
 #define KEYSIZE 100
 
+typedef void* keyy_t;
+typedef void* val_t;
+
+/**
+ */
+
+typedef struct KV_pair {
+  keyy_t key;		/*key_t is used by some library function*/
+  val_t value;
+} KV_pair;
 
 /**
  * Hash Table bucket
  */
 
 struct HT_bucket {
-  map_key_t key ;
+  keyy_t key ;
   size_t size ;
   int count ;	//number of key-val pairs stored.
   struct HT_bucket* next_key ;
@@ -28,16 +38,6 @@ struct HT_bucket {
 
 typedef struct HT_bucket* HTABLE;
 
-/**
- * The tag table entry. Might need more fields. 
- * Stores the tag number and the key it corresponds to. 
- * Might also need to hash the key ??
- */
-struct tag_entry {
-  int tag ;
-  map_key_t key ;
-}
-
 struct reducer_t {
   keyy_t key;
   val_t* vals;
@@ -45,26 +45,25 @@ struct reducer_t {
   struct reducer_t* next;
 };
 
-/**
- * All the user options go here ONLY!
- */
 struct options {
 	int num_map_tasks;
 	int num_reducer_tasks;
 	int total_tasks;
 };
 
-/**
- * All the user defined functions.
- * There will be more if needed. Hash etc an be user-defined too if the user insists
- */
 struct udef_functions {
 	void* (*map) (void*);
 	void* (*reduce) (void*);
 	void* (*merge) (void*);
 };
 
-/*****************************************************************************/
+typedef struct key_list {
+	keyy_t key;
+	key_list* next;
+};
+
+/*******************************************************/
+
 void start_MR (struct options OPTIONS, struct udef_functions udf);
 
 //return list of unique keys?
@@ -74,6 +73,17 @@ int key_to_rank(keyy_t key);
 
 void REDUCE (int rank, struct options OPTIONS, struct udef_functions udf);
 
+
+
+void MAP (char* input_split,void (*mapfunc)(char**, KV_t*), void (*reducefunc)(char*, char*), int R, char **bucketfnames, MPI_Status stat, int rank, int numtasks);
+
+void reduce(MPI_File file, MPI_Status status, void (*reducefunc)(char*, char*));
+
+int intoReduceType(MPI_File file, struct reducer_t** first, MPI_Status status);
+
+void userdefReduce(char* inout, char* in);
+
+
 /***********************************************************/
 
 void itoa(int n, char s[]);
diff --git a/mapReduce.h~ b/mapReduce.h~
index 6a10047..5fd0186 100755
--- a/mapReduce.h~
+++ b/mapReduce.h~
@@ -19,13 +19,17 @@ typedef void* val_t;
 /**
  */
 
+typedef struct KV_pair {
+  keyy_t key;		/*key_t is used by some library function*/
+  val_t value;
+} KV_pair;
 
 /**
  * Hash Table bucket
  */
 
 struct HT_bucket {
-  map_key_t key ;
+  keyy_t key ;
   size_t size ;
   int count ;	//number of key-val pairs stored.
   struct HT_bucket* next_key ;
@@ -34,11 +38,6 @@ struct HT_bucket {
 
 typedef struct HT_bucket* HTABLE;
 
-struct tag_entry {
-  int tag ;
-  map_key_t key ;
-}
-
 struct reducer_t {
   keyy_t key;
   val_t* vals;
@@ -58,7 +57,13 @@ struct udef_functions {
 	void* (*merge) (void*);
 };
 
-/*****************************************************************************/
+typedef struct key_list {
+	keyy_t key;
+	key_list* next;
+};
+
+/*******************************************************/
+
 void start_MR (struct options OPTIONS, struct udef_functions udf);
 
 //return list of unique keys?
@@ -68,6 +73,17 @@ int key_to_rank(keyy_t key);
 
 void REDUCE (int rank, struct options OPTIONS, struct udef_functions udf);
 
+
+
+void MAP (char* input_split,void (*mapfunc)(char**, KV_t*), void (*reducefunc)(char*, char*), int R, char **bucketfnames, MPI_Status stat, int rank, int numtasks);
+
+void reduce(MPI_File file, MPI_Status status, void (*reducefunc)(char*, char*));
+
+int intoReduceType(MPI_File file, struct reducer_t** first, MPI_Status status);
+
+void userdefReduce(char* inout, char* in);
+
+
 /***********************************************************/
 
 void itoa(int n, char s[]);
diff --git a/protocol.c b/protocol.c
deleted file mode 100644
index a8184d8..0000000
--- a/protocol.c
+++ /dev/null
@@ -1,60 +0,0 @@
-/* implementation of the protocol for the comminucation between map process and reduce process */
-#define ACK_TABLE -1
-#define ACK_VALUE -2
-#define SEND_TABLE -3
-#define SEND_VALUE -4 
-
-void mapprocess()
-{
-	int reduce_rank;int flag =0;
-	char *temp; void *buff;
-	if(is_mapper()){
-		reduce_rank = key_to_rank(key);
-		if (!flag){
-		
-			MPI_Send(0, 0,MPI_INT , reduce_rank, SEND_TABLE, MPI_COMM_WORLD);
-			flag=1;
-		}
-		while(1){
-				MPI_Recv(temp, 10, MPI_INT, reduce_rank, TAG, MPI_COMM_WORLD);
-				if( TAG == ACK_TABLE && tagtable[i]!=NULL){
-					//size
-					MPI_Send( &tagtable[i] , /*key->elem_size*/, tag_entry , reduce_rank, SEND_TABLE, MPI_COMM_WORLD);
-				}
-				else if( TAG == ACK_VALUE && mapout[i]!=NULL){
-					MPI_Send( /*MAP_OUT*/ , /*key->elem_size*/, tag_entry , reduce_rank, SEND_VALUE, MPI_COMM_WORLD);
-				}				
-				else{
-					break;
-				}
-		}		
-		
-	}		
-}
-
-void reduceproceess()
-{
-	int map_rank;int flag =0; int source_rank;
-	char *temp; void *buff;
-	if(is_reducer()){
-		
-		
-		while(1){
-				MPI_Recv(temp, 10, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD);
-				source_rank=status(MPI_SOURCE);
-				if( Tag == SEND_TABLE ){
-					//allocate memory for table values
-					MPI_Send(  0, 0, MPI_INT , status(MPI_SOURCE), ACK_TABLE, MPI_COMM_WORLD);
-				}
-				else if( TAG == SEND_VALUE ){
-					// allocate memory for values
-					MPI_Send( 0, 0, MPI_INT, source_rank, ACK_VALUE, MPI_COMM_WORLD);
-				}				
-				else{
-					break;
-				}
-		}		
-		
-	}		
-
-}
diff --git a/protocol.c~ b/protocol.c~
deleted file mode 100644
index 966ce4e..0000000
--- a/protocol.c~
+++ /dev/null
@@ -1,62 +0,0 @@
-/* implementation of the protocol for the comminucation between map process and reduce process */
-#define ACK_TABLE -1
-#define ACK_VALUE -2
-#define SEND_TABLE -3
-#define SEND_VALUE -4 
-
-void mapprocess()
-{
-	int reduce_rank;int flag =0;
-	char *temp; void *buff;
-	if(is_mapper()){
-		reduce_rank = key_to_rank(key);
-		if (!flag){
-		
-			MPI_Send(0, 0,MPI_INT , reduce_rank, SEND_TABLE, MPI_COMM_WORLD);
-			flag=1;
-		}
-		while(1){
-				MPI_Recv(temp, 10, MPI_INT, reduce_rank, TAG, MPI_COMM_WORLD);
-				if( TAG == ACK_TABLE && tagtable[i]!=NULL){
-					//size
-					MPI_Send( &tagtable[i] , /*key->elem_size*/, tag_entry , reduce_rank, SEND_TABLE, MPI_COMM_WORLD);
-				}
-				else if( TAG == ACK_VALUE && mapout[i]!=NULL){
-					MPI_Send( /*MAP_OUT*/ , /*key->elem_size*/, tag_entry , reduce_rank, SEND_VALUE, MPI_COMM_WORLD);
-				}				
-				else{
-					break;
-				}
-		}		
-		
-	}		
-}
-
-void reduceproceess()
-{
-	int map_rank;int flag =0;
-	char *temp; void *buff;
-	if(is_reducer()){
-		
-		if (!flag){
-		
-			MPI_Send(0, 0,MPI_INT , reduce_rank, SEND_TABLE, MPI_COMM_WORLD);
-			flag=1;
-		}
-		while(1){
-				MPI_Recv(temp, 10, MPI_INT, reduce_rank, TAG, MPI_COMM_WORLD);
-				if( TAG == ACK_TABLE && tagtable[i]!=NULL){
-					//size
-					MPI_Send( &tagtable[i] , /*key->elem_size*/, tag_entry , reduce_rank, SEND_TABLE, MPI_COMM_WORLD);
-				}
-				else if( TAG == ACK_VALUE && mapout[i]!=NULL){
-					MPI_Send( /*MAP_OUT*/ , /*key->elem_size*/, tag_entry , reduce_rank, SEND_VALUE, MPI_COMM_WORLD);
-				}				
-				else{
-					break;
-				}
-		}		
-		
-	}		
-
-}

About

Map Reduce Merge

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages