-
Notifications
You must be signed in to change notification settings - Fork 1
prof123/mapreduce
Folders and files
| Name | Name | Last commit message | Last commit date | |
|---|---|---|---|---|
Repository files navigation
diff --git a/MAP.h b/MAP.h
index 840eb78..bd16dee 100644
--- a/MAP.h
+++ b/MAP.h
@@ -2,23 +2,17 @@
* Header file for the MAP tasks??
*/
-typedef void* map_key_t ;
-typedef void* map_val_t ;
+typedef char* map_key_t ;
+typedef char* map_val_t ;
-/**
- * The key, value pairs handled in MAP. MAP gets this from the user defined map functions, and deals with these types throughout - hashing etc
- */
struct MAP_keyval {
map_key_t key ;
map_val_t value ;
};
-/**
- * This is what the user-defined map function spits out - a dynamic array of key-value pairs, and a count of the number of pairs */
-
struct MAP_out {
int count ; //number of key-value pairs in the dynamic array
- DYNARRAY data ; //actual mapped list //K: add another *, before data.
+ DYNARRAY data ; //actual mapped list
};
diff --git a/MAP.h~ b/MAP.h~
index beba89d..15a0fc3 100644
--- a/MAP.h~
+++ b/MAP.h~
@@ -2,19 +2,15 @@
* Header file for the MAP tasks??
*/
-typedef void* map_key_t ;
-typedef void* map_val_t ;
+typedef char* map_key_t ;
+typedef char* map_val_t ;
-struct MAP_keyval {
+struct map_keyval {
map_key_t key ;
map_val_t value ;
};
-struct MAP_out {
- int count ; //number of key-value pairs in the dynamic array
- DYNARRAY data ; //actual mapped list //K: add another *, before data.
-};
-
+
diff --git a/da_test.c~ b/da_test.c~
index 4870406..f5ede75 100644
--- a/da_test.c~
+++ b/da_test.c~
@@ -3,7 +3,7 @@
#include <stdio.h>
int main() {
- DYNARRAY a = new_array(sizeof(ELEMENT));
+ DYNARRAY a = new_array(sizeof(char));
int c = insert_into(a,32);
printf("%d\n",a->DATA[0]);
}
diff --git a/dyn_array.c b/dyn_array.c
index 0eef029..14951ee 100644
--- a/dyn_array.c
+++ b/dyn_array.c
@@ -1,7 +1,4 @@
#include "dyn_array.h"
-/**
- * This is the dynamic array implementation used
- */
DYNARRAY new_array (size_t elem_size)
{
@@ -13,7 +10,6 @@ DYNARRAY new_array (size_t elem_size)
return array ;
}
-
int insert_into (DYNARRAY array , ELEMENT e) //size of the element
{
DYNARRAY v ;
@@ -27,7 +23,7 @@ int insert_into (DYNARRAY array , ELEMENT e) //size of the element
max = v->max ;
if (count >= max) { /* reached max limit allocated */
v->max = ((v->max)*2) + 1 ;
- new_size = (v->max)*(v->elem_size) ; //
+ new_size = (v->max)*(v->elem_size) ;
v->DATA = (ELEMENT*) realloc(v->DATA , new_size);
}
v->DATA[count] = e ;
@@ -37,21 +33,6 @@ int insert_into (DYNARRAY array , ELEMENT e) //size of the element
}
-/**
- * TODO:Evaluate this
- */
-void* get_i(DYNARRAY array, int position)
-{
- void* v = (void*) array ;
-//Do not know the element type etc. Hmm..
-//
- size_t size = array->elem_size ;
- unsigned int offset = size*position ;
- void* r = v+offset ;
- return r ;
-/*It's critical that the offset is correct :P. Should be able to test this using dyn_test.So another TODO. We are returning a pointer to the caller (MAP actually) and assuming that it knows the size of each element so it only reads those many bytes and there's no overlap etc etc.
- */
-}
-}
+
diff --git a/dyn_array.c~ b/dyn_array.c~
index 0eef029..14951ee 100644
--- a/dyn_array.c~
+++ b/dyn_array.c~
@@ -1,7 +1,4 @@
#include "dyn_array.h"
-/**
- * This is the dynamic array implementation used
- */
DYNARRAY new_array (size_t elem_size)
{
@@ -13,7 +10,6 @@ DYNARRAY new_array (size_t elem_size)
return array ;
}
-
int insert_into (DYNARRAY array , ELEMENT e) //size of the element
{
DYNARRAY v ;
@@ -27,7 +23,7 @@ int insert_into (DYNARRAY array , ELEMENT e) //size of the element
max = v->max ;
if (count >= max) { /* reached max limit allocated */
v->max = ((v->max)*2) + 1 ;
- new_size = (v->max)*(v->elem_size) ; //
+ new_size = (v->max)*(v->elem_size) ;
v->DATA = (ELEMENT*) realloc(v->DATA , new_size);
}
v->DATA[count] = e ;
@@ -37,21 +33,6 @@ int insert_into (DYNARRAY array , ELEMENT e) //size of the element
}
-/**
- * TODO:Evaluate this
- */
-void* get_i(DYNARRAY array, int position)
-{
- void* v = (void*) array ;
-//Do not know the element type etc. Hmm..
-//
- size_t size = array->elem_size ;
- unsigned int offset = size*position ;
- void* r = v+offset ;
- return r ;
-/*It's critical that the offset is correct :P. Should be able to test this using dyn_test.So another TODO. We are returning a pointer to the caller (MAP actually) and assuming that it knows the size of each element so it only reads those many bytes and there's no overlap etc etc.
- */
-}
-}
+
diff --git a/dyn_array.h b/dyn_array.h
index 5ed665c..859f892 100644
--- a/dyn_array.h
+++ b/dyn_array.h
@@ -6,9 +6,7 @@
/**
* Dynamic array implementation
*/
-/**
- * READ THE GODDAMN CODE BEFORE GETTING STUCK HERE FOR 3 WEEKS! >.<
- */
+
#ifndef ELEMENT
typedef char ELEMENT ;
#endif
@@ -16,7 +14,7 @@ typedef char ELEMENT ;
struct dynarry_struct {
int count ;
int max ;
- size_t elem_size ; //SEE THIS? YES SIZE
+ size_t elem_size ;
ELEMENT* DATA ;
};
@@ -28,7 +26,6 @@ DYNARRAY new_array (size_t elem_size) ;
int insert_into (DYNARRAY array , ELEMENT e) ;
-/**THis is for the case if we dont want statically determined types. Array access cant be so straight forward. Also notice the return type, it is a void*, a pointer to some sequence of bytes.*/
-void get_i(DYNARRAY array, int position) ;
+
diff --git a/dyn_array.h~ b/dyn_array.h~
index 859f892..4156133 100644
--- a/dyn_array.h~
+++ b/dyn_array.h~
@@ -8,7 +8,7 @@
*/
#ifndef ELEMENT
-typedef char ELEMENT ;
+extern ELEMENT ;
#endif
struct dynarry_struct {
diff --git a/mapReduce.c b/mapReduce.c
index f285389..a8871e1 100755
--- a/mapReduce.c
+++ b/mapReduce.c
@@ -12,29 +12,65 @@
#include "MAP.h"
#include "dyn_array.h"
-int main(int argc, char** argv)
+
+void mapReduce(int files, char **fnames, int nprocesses, int rprocesses, void *mapfunc, void *reducefunc)
{
- /**
- * TODO:Spawn all the mpi stuff here..
- * TODO:fill the options for MAP.
- */
- if( is_mapper(myrank)) {
-
- initialize_HASH_table() ;
- MAP(myrank,OPTIONS,udf) ;
- }
+ int numtasks, rank, source=0, dest, tag=1, i, amount;
+ char buf[LINE_MAX], **bucketfnames, bucketfname[LINE_MAX];
+ MPI_File file, *bucketfiles;
+ MPI_Offset size, startpoint;
+
+ bzero(buf, LINE_MAX);
- if(is_reducer(myrank)) {
+ MPI_Status stat;
+ MPI_Init(NULL,NULL);
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+ MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
+
+ if(MPI_File_open(MPI_COMM_WORLD, fnames[0], MPI_MODE_RDONLY | MPI_MODE_UNIQUE_OPEN, MPI_INFO_NULL, &file) < 0) {
+ perror("File open");
+ exit(0);
+ }
+
+ bucketfnames = malloc(rprocesses * sizeof(char *));
+ //bucketfiles = malloc(rprocesses * sizeof(MPI_File));
+ for(i = 0; i < rprocesses; i++) {
+ sprintf(bucketfname, "file%d", i);
+ bucketfnames[i] = malloc(strlen(bucketfname)+1);
+ sprintf(bucketfnames[i], "%s", bucketfname);
}
-}
+ MPI_File_get_size(file, &size);
+ startpoint = size / numtasks * rank;
+
+ if(rank != numtasks - 1)
+ amount = size / numtasks * (rank + 1) - startpoint;
+ else
+ amount = size - startpoint;
+
+ MPI_File_read_at(file, startpoint, buf, amount, MPI_CHAR, &stat);
+
+ MAP (buf, mapfunc, reducefunc, rprocesses, bucketfnames, stat, rank, numtasks);
+
+ /* printf("rank = %d, data = ", rank);
+ for(i = 0; i < amount; i++)
+ printf(" %c",buf[i]);
+ printf("\n");*/
+ //MPI_File filee;
+ printf("The End\n");
+ MPI_Barrier( MPI_COMM_WORLD );
+ printf("Closing\n");
+
+ MPI_File_close(&file);
+ MPI_Finalize();
+}
void MAP (int rank, struct options OPTIONS, struct udef_functions udf)
{
void* mapfun = udf.mapfunc ;
- struct MAP_out map_output ; //Store the output of udefmap
+ struct MAP_out map_output ;
map_output = (*mapfunc)(rank) ;
/* User does the input split based on rank. */
/* And does dynamic array stuff.*/
@@ -44,19 +80,14 @@ void MAP (int rank, struct options OPTIONS, struct udef_functions udf)
int i = 0 ;
struct MAP_keyval keyval ;
-/*Careful!We now assume that the type of the keyval etc is known.
- * If we go the size way then modify dynamic array implementation slightly
- */
while (i < output.count) {
-
- keyval = output[i] ;
-
+ keyval = out.put[i] ;
HASH (keyval) ;
-
i++ ;
}
- populate_tag_tab() ;
+ MPI_Barrier( MPI_COMM_WORLD );
+
}
@@ -69,13 +100,9 @@ void initialize_HASH_table()
}
/**
- * Hash given keyval pair into the Hash_Table. Assumes we have the keycompare function
- * keycmp.Should the user gives that?
+ * Hash given keyval pair into the Hash_Table
*/
-/**
- * TODO: CHECK this function for pointer safety and logic.
- */
int HASH (struct MAP_keyval keyval)
{
map_key_t key ;
@@ -110,9 +137,6 @@ struct tag_entry[TAG_MAX] tag_table ;
/**
* goes through hash table and populates tag table..
*/
-/*
- * TODO: CHECK for logical loopholes please
- */
int populate_tag_tab()
{
HTABLE curr_key = Hash_Table ;
@@ -132,20 +156,371 @@ int populate_tag_tab()
int is_mapper(int rank) {
return 1 ;
}
-
/* TODO */
int is_reducer(int rank) {
return 1;
}
-/* TODO */
-int is_merger(int rank) {
- return 1;
+void reduce(MPI_File file, MPI_Status status, void (*reducefunc)(char*, char*)) {
+ int i;
+ struct reducer_t* first, *temp;
+ intoReduceType(file, &first, status);
+
+
+ printf("All the keys in the end:\n");
+
+ temp = first;
+ while(temp != NULL) {
+
+ printf("stored key:%s, size:%d\n", temp->key, temp->size);
+ for(i = 0; i < temp->size; i++)
+ printf("value[%d]:%s\n", i, temp->vals[i]);
+
+ temp = temp->next;
+ }
+
+ /*BEGIN: prateek
+ *Now onto the actual reduce. We are finished with grouping by key now***/
+ temp=first; //first is the first 'bucket'
+ while(temp!=NULL) {
+ int i=0;
+ char* reduced_val = temp->vals[0]; //the first value. ?
+
+ for(i=0; i<(temp->size-1); i++) {
+ //userdefReduce(reduced_val , temp->vals[i+1]);
+ (*reducefunc)(reduced_val, temp->vals[i+1]);
+ }
+
+ printf("KEY:%s , VALUE:%s\n",temp->key,reduced_val);
+
+ temp=temp->next;
+ }
+
+
}
-/**
- * Not needed presently but a simple hash function is always useful!
+/****REDUCED****/
+
+
+/*user defined Reduce function*** - simple addition*/
+/*void userdefReduce(char* inout, char* in)
+{
+ int i=atoi(in);
+ int j=atoi(inout);
+ sprintf(inout, "%d", (i+j));
+}
+*/
+/* itoa: convert n to characters in s. picked from K&R. */
+/*void itoa(int n, char s[])
+{
+ int i, sign;
+
+ if ((sign = n) < 0) /* record sign */
+ /* n = -n; /* make n positive */
+ /* i = 0;
+ do { /* generate digits in reverse order */
+ /* s[i++] = n % 10 + '0'; /* get next digit */
+ /* } while ((n /= 10) > 0); /* delete it */
+/* if (sign < 0)
+ s[i++] = '-';
+ s[i] = '\0';
+ reverse(s);
+}
+
+/* reverse: reverse string s in place.Again K&R. */
+/*void reverse(char s[])
+{
+ int c, i, j;
+
+ for (i = 0, j = strlen(s)-1; i<j; i++, j--) {
+ c = s[i];
+ s[i] = s[j];
+ s[j] = c;
+ }
+}*/
+/*END: prateek */
+
+
+int intoReduceType(MPI_File file, struct reducer_t** firstt, MPI_Status status) {
+ char buf[LINE_MAX], *bufptr, keybuf[100], sizebuf[100], **vals, valsbuf[100][100], valbuf[100];
+ int ret = 0, r, atnow = 0, valsize, values, readbytes, i, a;
+ MPI_Offset startpoint = 0, fsize;
+ struct reducer_t *temp = NULL, *first = NULL, *next;
+
+ bzero(buf, LINE_MAX);
+ //MPI_File_set_view( file, 0, MPI_CHAR, MPI_CHAR, "native", MPI_INFO_NULL );
+ //MPI_File_read(file, buf, LINE_MAX, MPI_CHAR, &status);
+
+ MPI_File_get_size(file, &fsize);
+ MPI_File_read_at(file, startpoint, buf, fsize, MPI_CHAR, &status);
+
+ printf("printing buffer, size = %d: ", fsize);
+ for(r = 0; r < fsize; r++) {
+ if(buf[r] == '\0')
+ printf("_");
+ else
+ printf("%c", buf[r]);
+ }
+ printf("buffer printed\n");
+ //exit(0);
+ if(strlen(buf) == 0)
+ return 0;
+
+ bufptr = buf;
+ while(1) {
+ if((atnow+1) >= fsize)
+ break;
+
+ //bufptr = &buf[atnow];
+
+ bzero(keybuf, 100);
+ memcpy(keybuf, bufptr, strlen(bufptr));
+ atnow += strlen(keybuf) +1;
+ bufptr = &buf[atnow];
+
+ bzero(sizebuf, 100);
+ memcpy(sizebuf, bufptr, strlen(bufptr));
+ atnow += strlen(sizebuf) +1;
+ bufptr = &buf[atnow];
+
+ valsize = atoi(sizebuf);
+ values = 0;
+ readbytes = 0;
+ while(1) {
+ bzero(valbuf, 100);
+ memcpy(valbuf, bufptr, strlen(bufptr));
+ values++;
+ strcpy(valsbuf[values-1], valbuf);
+ readbytes += strlen(valbuf)+1;
+
+ atnow += strlen(valbuf)+1;
+ bufptr = &buf[atnow];
+
+ if(readbytes == valsize)
+ break;
+ }/*
+ printf("key:%s, size:%d\n", keybuf, values);
+ for(i = 0; i < values; i++)
+ printf("value[%d]:%s\n", i, valsbuf[i]);
+ */
+ // vals = malloc(values * sizeof(char *));
+ //for(i = 0; i < values; i++) {
+ // vals[i] = malloc(strlen(valsbuf[i]) * sizeof(char));
+ // strcpy(vals[i], valsbuf[i]);
+ //}
+
+ temp = first;
+
+ // This is for the first key.
+ if(temp == NULL) {
+ temp = malloc(sizeof(struct reducer_t));
+ temp->next = NULL;
+ temp->size = values;
+ temp->key = malloc((strlen(keybuf)+1) * sizeof(char));
+ strcpy(temp->key, keybuf);
+
+ temp->vals = malloc(values * sizeof(char *));
+ for(i = 0; i < values; i++) {
+ temp->vals[i] = malloc((strlen(valsbuf[i])+1) * sizeof(char));
+ strcpy(temp->vals[i], valsbuf[i]);
+ }
+
+ first = temp;
+ /*
+ printf("first stored key:%s, size:%d\n", temp->key, temp->size);
+ for(i = 0; i < temp->size; i++)
+ printf("value[%d]:%s\n", i, temp->vals[i]);
+
+ printf("position now:%d\n",atnow);*/
+
+ continue;
+ }
+
+
+ //printf("");
+
+
+
+ while(1) {
+ if(strcmp(temp->key, keybuf) == 0) {
+ char **copy = temp->vals;
+ temp->vals = malloc((temp->size+values) * sizeof(char *));
+ for(i = 0; i < temp->size; i++) {
+ temp->vals[i] = malloc((strlen(copy[i])+1) * sizeof(char));
+ strcpy(temp->vals[i], copy[i]);
+ }
+ for(a = 0;i < (temp->size+values); a++, i++) {
+ temp->vals[i] = malloc((strlen(valsbuf[a])+1) * sizeof(char));
+ strcpy(temp->vals[i], valsbuf[a]);
+ }
+ for(i = 0; i < temp->size; i++)
+ free(copy[i]);
+ free(copy);
+
+ temp->size += values;
+
+ /*
+ printf("appended key:%s, size:%d\n", temp->key, temp->size);
+ for(i = 0; i < temp->size; i++)
+ printf("value[%d]:%s\n", i, temp->vals[i]);
+
+ printf("position now:%d\n",atnow);*/
+ break;
+ }
+ else if(temp->next == NULL) {
+ next = malloc(sizeof(struct reducer_t));
+ next->next = NULL;
+ next->size = values;
+ next->key = malloc((strlen(keybuf)+1) * sizeof(char));
+ strcpy(next->key, keybuf);
+
+ next->vals = malloc(values * sizeof(char *));
+ for(i = 0; i < values; i++) {
+ next->vals[i] = malloc((strlen(valsbuf[i])+1) * sizeof(char));
+ strcpy(next->vals[i], valsbuf[i]);
+ }
+
+ temp->next = next;
+ /*
+ printf("new stored key:%s, size:%d\n", temp->next->key, temp->next->size);
+ for(i = 0; i < temp->next->size; i++)
+ printf("value[%d]:%s\n", i, temp->next->vals[i]);
+
+ printf("position now:%d\n",atnow);*/
+ break;
+ }
+ temp = temp->next;
+ }
+
+ }
+
+
+
+
+ *firstt = first;
+
+
+}
+
+
+
+ /*Now that all mapping is done, wait for other maps to finish, group-by-key an *d call reduce
+ */
+ //Wait_for_all_Mappers();
+// MPI_Barrier(MPI_COMM_GLOBAL);
+
+ /*
+ HTABLE* temp=htable;
+ struct kv_list* kv;
+ for(r=0;r<R;r++) {
+ temp=htable[r];
+ while(temp!=NULL) {
+ kv=temp->vals;
+ REDUCE(kv,r);
+ }
+ }*/
+//}
+
+//InterKV udefMAP(char* input) {
+ /*User defined MAP function*/
+//}
+
+
+/*Reduce: specify the intermediate key for which the reduce is to be performed*/
+/*
+struct reducer_t REDUCER(struct kv_list* to_reduce,int r)
+{
+ struct reducer_t reduce=_malloc();
+ reduce->next=NULL;
+ MPI_Reduce(/*something goes here*//*);
+ MPI_Reduce((void*)to_reduce ,(void*)reduce->reduced_val ,to_reduce->count,
+ MPI_Datatype datatype, MPI_Op op,r,COMM_GLOBAL)
+ if(rank==r) {
+ /*print the reduced values*//*
+ _PRINT(reduced->val);
+ }
+}
+*/
+/*returns a pointer to the list containing all the (key,value) pairs for a uniq *key. effectively, traverse the hash table, by keeping some 'global' state in t
+ *he MAP function
*/
+/*struct kv_list* get_next_unique_ikey(HTABLE htable[])
+{
+ int r=0;
+ HTABLE* temp=htable;
+ struct kv_list* kv;
+ for(r=0;r<R;r++) {
+ temp=htable[r];
+ while(temp!=NULL) {
+ kv=temp->vals;
+ REDUCE(kv,r);
+ }
+}*/
+
+
+
+/**
+ */
+int HASH(KV_t *kv, HTABLE htable[], int R)
+{
+ /*firstly, determine where key will hash to*/
+ int bucket_num = hashfun(kv->key, R);
+
+ char *to_add = kv->value;
+ /*to_add->next assigned to previous head of list*/
+
+ /*now figure out the 'top-level' bucket to add it to*/
+ struct HT_bucket *buck = htable[bucket_num];
+ if(buck==NULL) {
+ /*new->values = malloc(sizeof(char) * LINE_MAX);
+ new->key = malloc(sizeof(char) * (strlen(kv->key)+1));
+ *create HT_bucket node, and add the
+ *key-val pair to the bucket */
+ struct HT_bucket* new = (struct HT_bucket*) malloc(sizeof(struct HT_bucket) );
+ new->values = malloc(sizeof(char) * LINE_MAX);
+ new->key = malloc(sizeof(char) * (strlen(kv->key)+1));
+ strcpy(new->key, kv->key);
+ sprintf(new->values, "%s", to_add);
+ new->size = strlen(to_add) +1;
+
+ new->next_key=NULL;
+ htable[bucket_num] = new;
+
+ return 1;
+ }
+
+ struct HT_bucket *temp=buck;
+ while(1) {
+
+ // Found right key. Adding entry.
+ if(strcmp(temp->key, kv->key) == 0) {
+
+ sprintf(&(temp->values[temp->size]), "%s", to_add);
+ temp->size += strlen(to_add)+1;
+ break;
+ }
+ // To next bucket
+ else if(temp->next_key != NULL)
+ temp = temp->next_key;
+
+ else { // Adding a new bucket
+ struct HT_bucket* new=(struct HT_bucket*) malloc(sizeof(struct HT_bucket) );
+ new->values = malloc(sizeof(char) * LINE_MAX);
+ new->key = malloc(sizeof(char) * (strlen(kv->key)+1));
+ strcpy(new->key, kv->key);
+ sprintf(new->values, "%s", to_add);
+ new->size = strlen(to_add) +1;
+
+ new->next_key=NULL;
+ temp->next_key=new;
+ break;
+ }
+ }
+
+ return 1;
+
+}
+
int hashfun(char* str,int R)
{
int h=0;
diff --git a/mapReduce.c~ b/mapReduce.c~
index 6ad5ba4..3bdca5c 100755
--- a/mapReduce.c~
+++ b/mapReduce.c~
@@ -12,22 +12,65 @@
#include "MAP.h"
#include "dyn_array.h"
-int main(int argc, char** argv)
+
+void mapReduce(int files, char **fnames, int nprocesses, int rprocesses, void *mapfunc, void *reducefunc)
{
- /**
- * TODO:Spawn all the mpi stuff here..
- * TODO:fill the options for MAP.
- */
- initialize_HASH_table() ;
- MAP(myrank,OPTIONS,udf) ;
-}
+ int numtasks, rank, source=0, dest, tag=1, i, amount;
+ char buf[LINE_MAX], **bucketfnames, bucketfname[LINE_MAX];
+ MPI_File file, *bucketfiles;
+ MPI_Offset size, startpoint;
+
+ bzero(buf, LINE_MAX);
+ MPI_Status stat;
+
+ MPI_Init(NULL,NULL);
+ MPI_Comm_rank(MPI_COMM_WORLD, &rank);
+ MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
+
+ if(MPI_File_open(MPI_COMM_WORLD, fnames[0], MPI_MODE_RDONLY | MPI_MODE_UNIQUE_OPEN, MPI_INFO_NULL, &file) < 0) {
+ perror("File open");
+ exit(0);
+ }
+
+ bucketfnames = malloc(rprocesses * sizeof(char *));
+ //bucketfiles = malloc(rprocesses * sizeof(MPI_File));
+ for(i = 0; i < rprocesses; i++) {
+ sprintf(bucketfname, "file%d", i);
+ bucketfnames[i] = malloc(strlen(bucketfname)+1);
+ sprintf(bucketfnames[i], "%s", bucketfname);
+ }
+
+ MPI_File_get_size(file, &size);
+ startpoint = size / numtasks * rank;
+
+ if(rank != numtasks - 1)
+ amount = size / numtasks * (rank + 1) - startpoint;
+ else
+ amount = size - startpoint;
+
+ MPI_File_read_at(file, startpoint, buf, amount, MPI_CHAR, &stat);
+
+ MAP (buf, mapfunc, reducefunc, rprocesses, bucketfnames, stat, rank, numtasks);
+
+ /* printf("rank = %d, data = ", rank);
+ for(i = 0; i < amount; i++)
+ printf(" %c",buf[i]);
+ printf("\n");*/
+ //MPI_File filee;
+ printf("The End\n");
+ MPI_Barrier( MPI_COMM_WORLD );
+ printf("Closing\n");
+
+ MPI_File_close(&file);
+ MPI_Finalize();
+}
void MAP (int rank, struct options OPTIONS, struct udef_functions udf)
{
void* mapfun = udf.mapfunc ;
- struct MAP_out map_output ; //Store the output of udefmap
+ struct MAP_out map_output ;
map_output = (*mapfunc)(rank) ;
/* User does the input split based on rank. */
/* And does dynamic array stuff.*/
@@ -37,19 +80,14 @@ void MAP (int rank, struct options OPTIONS, struct udef_functions udf)
int i = 0 ;
struct MAP_keyval keyval ;
-/*Careful!We now assume that the type of the keyval etc is known.
- * If we go the size way then modify dynamic array implementation slightly
- */
while (i < output.count) {
-
- keyval = output[i] ;
-
+ keyval = out.put[i] ;
HASH (keyval) ;
-
i++ ;
}
- populate_tag_tab() ;
+ MPI_Barrier( MPI_COMM_WORLD );
+
}
@@ -62,13 +100,9 @@ void initialize_HASH_table()
}
/**
- * Hash given keyval pair into the Hash_Table. Assumes we have the keycompare function
- * keycmp.Should the user gives that?
+ * Hash given keyval pair into the Hash_Table
*/
-/**
- * TODO: CHECK this function for pointer safety and logic.
- */
int HASH (struct MAP_keyval keyval)
{
map_key_t key ;
@@ -96,45 +130,6 @@ int HASH (struct MAP_keyval keyval)
return 1 ;
}
-#define TAG_MAX 10000
-//tag_max is max number of tags supported == INT_MAX
-struct tag_entry[TAG_MAX] tag_table ;
-
-/**
- * goes through hash table and populates tag table..
- */
-/*
- * TODO: CHECK for logical loopholes please
- */
-int populate_tag_tab()
-{
- HTABLE curr_key = Hash_Table ;
- map_key_t key ;
- struct tag_entry t;
- int i = 0 ;
- while (curr_key!=NULL) {
- key = curr_key->key ;
- tag_table[i].tag = i ;
- tag_table[i].key = key ;
- return 1 ;
- }
- return 0 ;
-}
-
-/* TODO */
-int is_mapper(int rank) {
- return 1 ;
-}
-
-/* TODO */
-int is_reducer(int rank) {
- return 1;
-}
-
-/* TODO */
-int is_merger(int rank) {
- return 1;
-}
void reduce(MPI_File file, MPI_Status status, void (*reducefunc)(char*, char*)) {
int i;
@@ -276,11 +271,6 @@ int intoReduceType(MPI_File file, struct reducer_t** firstt, MPI_Status status)
}/*
printf("key:%s, size:%d\n", keybuf, values);
for(i = 0; i < values; i++)
- if(readbytes == valsize)
- break;
- }/*
- printf("key:%s, size:%d\n", keybuf, values);
- for(i = 0; i < values; i++)
printf("value[%d]:%s\n", i, valsbuf[i]);
*/
// vals = malloc(values * sizeof(char *));
@@ -416,6 +406,11 @@ struct reducer_t REDUCER(struct kv_list* to_reduce,int r)
MPI_Reduce((void*)to_reduce ,(void*)reduce->reduced_val ,to_reduce->count,
MPI_Datatype datatype, MPI_Op op,r,COMM_GLOBAL)
if(rank==r) {
+ /*print the reduced values*//*
+ _PRINT(reduced->val);
+ }
+}
+*/
/*returns a pointer to the list containing all the (key,value) pairs for a uniq *key. effectively, traverse the hash table, by keeping some 'global' state in t
*he MAP function
*/
diff --git a/mapReduce.h b/mapReduce.h
index e071775..5fd0186 100755
--- a/mapReduce.h
+++ b/mapReduce.h
@@ -13,13 +13,23 @@
#define KEYSIZE 100
+typedef void* keyy_t;
+typedef void* val_t;
+
+/**
+ */
+
+typedef struct KV_pair {
+ keyy_t key; /*key_t is used by some library function*/
+ val_t value;
+} KV_pair;
/**
* Hash Table bucket
*/
struct HT_bucket {
- map_key_t key ;
+ keyy_t key ;
size_t size ;
int count ; //number of key-val pairs stored.
struct HT_bucket* next_key ;
@@ -28,16 +38,6 @@ struct HT_bucket {
typedef struct HT_bucket* HTABLE;
-/**
- * The tag table entry. Might need more fields.
- * Stores the tag number and the key it corresponds to.
- * Might also need to hash the key ??
- */
-struct tag_entry {
- int tag ;
- map_key_t key ;
-}
-
struct reducer_t {
keyy_t key;
val_t* vals;
@@ -45,26 +45,25 @@ struct reducer_t {
struct reducer_t* next;
};
-/**
- * All the user options go here ONLY!
- */
struct options {
int num_map_tasks;
int num_reducer_tasks;
int total_tasks;
};
-/**
- * All the user defined functions.
- * There will be more if needed. Hash etc an be user-defined too if the user insists
- */
struct udef_functions {
void* (*map) (void*);
void* (*reduce) (void*);
void* (*merge) (void*);
};
-/*****************************************************************************/
+typedef struct key_list {
+ keyy_t key;
+ key_list* next;
+};
+
+/*******************************************************/
+
void start_MR (struct options OPTIONS, struct udef_functions udf);
//return list of unique keys?
@@ -74,6 +73,17 @@ int key_to_rank(keyy_t key);
void REDUCE (int rank, struct options OPTIONS, struct udef_functions udf);
+
+
+void MAP (char* input_split,void (*mapfunc)(char**, KV_t*), void (*reducefunc)(char*, char*), int R, char **bucketfnames, MPI_Status stat, int rank, int numtasks);
+
+void reduce(MPI_File file, MPI_Status status, void (*reducefunc)(char*, char*));
+
+int intoReduceType(MPI_File file, struct reducer_t** first, MPI_Status status);
+
+void userdefReduce(char* inout, char* in);
+
+
/***********************************************************/
void itoa(int n, char s[]);
diff --git a/mapReduce.h~ b/mapReduce.h~
index 6a10047..5fd0186 100755
--- a/mapReduce.h~
+++ b/mapReduce.h~
@@ -19,13 +19,17 @@ typedef void* val_t;
/**
*/
+typedef struct KV_pair {
+ keyy_t key; /*key_t is used by some library function*/
+ val_t value;
+} KV_pair;
/**
* Hash Table bucket
*/
struct HT_bucket {
- map_key_t key ;
+ keyy_t key ;
size_t size ;
int count ; //number of key-val pairs stored.
struct HT_bucket* next_key ;
@@ -34,11 +38,6 @@ struct HT_bucket {
typedef struct HT_bucket* HTABLE;
-struct tag_entry {
- int tag ;
- map_key_t key ;
-}
-
struct reducer_t {
keyy_t key;
val_t* vals;
@@ -58,7 +57,13 @@ struct udef_functions {
void* (*merge) (void*);
};
-/*****************************************************************************/
+typedef struct key_list {
+ keyy_t key;
+ key_list* next;
+};
+
+/*******************************************************/
+
void start_MR (struct options OPTIONS, struct udef_functions udf);
//return list of unique keys?
@@ -68,6 +73,17 @@ int key_to_rank(keyy_t key);
void REDUCE (int rank, struct options OPTIONS, struct udef_functions udf);
+
+
+void MAP (char* input_split,void (*mapfunc)(char**, KV_t*), void (*reducefunc)(char*, char*), int R, char **bucketfnames, MPI_Status stat, int rank, int numtasks);
+
+void reduce(MPI_File file, MPI_Status status, void (*reducefunc)(char*, char*));
+
+int intoReduceType(MPI_File file, struct reducer_t** first, MPI_Status status);
+
+void userdefReduce(char* inout, char* in);
+
+
/***********************************************************/
void itoa(int n, char s[]);
diff --git a/protocol.c b/protocol.c
deleted file mode 100644
index a8184d8..0000000
--- a/protocol.c
+++ /dev/null
@@ -1,60 +0,0 @@
-/* implementation of the protocol for the comminucation between map process and reduce process */
-#define ACK_TABLE -1
-#define ACK_VALUE -2
-#define SEND_TABLE -3
-#define SEND_VALUE -4
-
-void mapprocess()
-{
- int reduce_rank;int flag =0;
- char *temp; void *buff;
- if(is_mapper()){
- reduce_rank = key_to_rank(key);
- if (!flag){
-
- MPI_Send(0, 0,MPI_INT , reduce_rank, SEND_TABLE, MPI_COMM_WORLD);
- flag=1;
- }
- while(1){
- MPI_Recv(temp, 10, MPI_INT, reduce_rank, TAG, MPI_COMM_WORLD);
- if( TAG == ACK_TABLE && tagtable[i]!=NULL){
- //size
- MPI_Send( &tagtable[i] , /*key->elem_size*/, tag_entry , reduce_rank, SEND_TABLE, MPI_COMM_WORLD);
- }
- else if( TAG == ACK_VALUE && mapout[i]!=NULL){
- MPI_Send( /*MAP_OUT*/ , /*key->elem_size*/, tag_entry , reduce_rank, SEND_VALUE, MPI_COMM_WORLD);
- }
- else{
- break;
- }
- }
-
- }
-}
-
-void reduceproceess()
-{
- int map_rank;int flag =0; int source_rank;
- char *temp; void *buff;
- if(is_reducer()){
-
-
- while(1){
- MPI_Recv(temp, 10, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD);
- source_rank=status(MPI_SOURCE);
- if( Tag == SEND_TABLE ){
- //allocate memory for table values
- MPI_Send( 0, 0, MPI_INT , status(MPI_SOURCE), ACK_TABLE, MPI_COMM_WORLD);
- }
- else if( TAG == SEND_VALUE ){
- // allocate memory for values
- MPI_Send( 0, 0, MPI_INT, source_rank, ACK_VALUE, MPI_COMM_WORLD);
- }
- else{
- break;
- }
- }
-
- }
-
-}
diff --git a/protocol.c~ b/protocol.c~
deleted file mode 100644
index 966ce4e..0000000
--- a/protocol.c~
+++ /dev/null
@@ -1,62 +0,0 @@
-/* implementation of the protocol for the comminucation between map process and reduce process */
-#define ACK_TABLE -1
-#define ACK_VALUE -2
-#define SEND_TABLE -3
-#define SEND_VALUE -4
-
-void mapprocess()
-{
- int reduce_rank;int flag =0;
- char *temp; void *buff;
- if(is_mapper()){
- reduce_rank = key_to_rank(key);
- if (!flag){
-
- MPI_Send(0, 0,MPI_INT , reduce_rank, SEND_TABLE, MPI_COMM_WORLD);
- flag=1;
- }
- while(1){
- MPI_Recv(temp, 10, MPI_INT, reduce_rank, TAG, MPI_COMM_WORLD);
- if( TAG == ACK_TABLE && tagtable[i]!=NULL){
- //size
- MPI_Send( &tagtable[i] , /*key->elem_size*/, tag_entry , reduce_rank, SEND_TABLE, MPI_COMM_WORLD);
- }
- else if( TAG == ACK_VALUE && mapout[i]!=NULL){
- MPI_Send( /*MAP_OUT*/ , /*key->elem_size*/, tag_entry , reduce_rank, SEND_VALUE, MPI_COMM_WORLD);
- }
- else{
- break;
- }
- }
-
- }
-}
-
-void reduceproceess()
-{
- int map_rank;int flag =0;
- char *temp; void *buff;
- if(is_reducer()){
-
- if (!flag){
-
- MPI_Send(0, 0,MPI_INT , reduce_rank, SEND_TABLE, MPI_COMM_WORLD);
- flag=1;
- }
- while(1){
- MPI_Recv(temp, 10, MPI_INT, reduce_rank, TAG, MPI_COMM_WORLD);
- if( TAG == ACK_TABLE && tagtable[i]!=NULL){
- //size
- MPI_Send( &tagtable[i] , /*key->elem_size*/, tag_entry , reduce_rank, SEND_TABLE, MPI_COMM_WORLD);
- }
- else if( TAG == ACK_VALUE && mapout[i]!=NULL){
- MPI_Send( /*MAP_OUT*/ , /*key->elem_size*/, tag_entry , reduce_rank, SEND_VALUE, MPI_COMM_WORLD);
- }
- else{
- break;
- }
- }
-
- }
-
-}
About
Map Reduce Merge
Resources
Stars
Watchers
Forks
Releases
No releases published
Packages 0
No packages published