Progetto di Reti di Calcolatori
Decisione di migrazione in sistemi distribuiti
Guida al Codice:
/* Codice processo Nodo*/
#include<stdio.h>
#include<fcntl.h>
#include<signal.h>
#include<sys/types.h>
#include<sys/wait.h>
#include<sys/uio.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<unistd.h>
#include<netdb.h>
#include<string.h>
#include<errno.h>
#define or ||
#define and &&
#define ever (;;)
const int molto_scarico=-2;
const int scarico=-1;
const int carico=1;
const int molto_carico=2;
const int equilibrio=0;
const int porta_media=25000;
const int porta_mscarico=25001;
const int porta_scarico=25002;
const int porta_carico=25003;
const int porta_mcarico=25004;
const int porta_stato=25005;
const int max_nodi=10;
void inizializza(int *stat,int *processi,int *med)
{ *stat=-2;
*processi=0;
*med=processi_attivi();
}
int processi_attivi()
{
int p1[2];
int proc,fr;
char buff[80];
pipe(p1);
proc=0;
if ((fr=fork())==0)
{ close (1);
dup(p1[1]);
close(p1[1]);close(p1[0]);
execl("/bin/ps", "ps","-ac",(char
*)0);
exit(4);
}
close (p1[1]);
while (read(p1[0],buff,80)>0)
{ ++proc;
}
close(p1[0]);
printf("proc %d\n",proc);
return (proc);
}
/*queste procedure sono inutili, ma sono un eredita' della prima
versione orientata agli oggetti, anche se qui sono stati riviste
e corrette */
int media(int *med) {return (*med);}
int aggiorna_media(int m) {return (m);}
int stato(int stat) {return (stat);}
void aggiorna_stato (int s,int *stat)
{if ((s=-2)or(s=-1)or(s=1)or(s=2)or(s=0))
*stat=s;
else {perror("impossible value of stato");
exit(1);
}
}/*fine agg_stato*/
void finefiglio (int sign)
{ int stato;
wait(&stato);
};
int calc_media(char *ws,int med)
{ int sd,m,p,p1,mm,mf,p2,m1,m2,mp,i;
char *buff;
struct hostent *host;
struct sockaddr_in rem_ind;
FILE *f;
printf("sono in calcmedia\n");
memset ((char*)&rem_ind,0,sizeof(struct sockaddr_in));
buff=(char * ) malloc (80);
/* connessione al nodo di sin*/
sscanf(ws,"lia%d",&i);
switch (i) {
case 5: i=10;
break;
case 7: i=5;
break;
default: --i;
}
printf("i vale %d\n",i);
if (i==10) sprintf(ws,"lia%d",i);
else sprintf(ws,"lia0%d",i);
printf("ws e' %s\n",ws);
host=gethostbyname(ws);
/*ws e' ilnome della workstation
che verra' di volta in volta passato*/
if (host==NULL) {perror("host not found");
exit(2);
}
rem_ind.sin_family=AF_INET;
rem_ind.sin_addr.s_addr=((struct in_addr*)
(host->h_addr))->s_addr;
rem_ind.sin_port=porta_media; /*porta e' una variabile alla quale
associamo la porta corretta del servizio*/
if((sd=socket(AF_INET, SOCK_STREAM, 0))<0)
{perror ("error in socket");
exit(3);
}
f=fdopen(sd,"r+");/*ricavo il formato giusto per la
flush*/
printf("sd e' %d\n",sd);
printf ("mi sto per connetere\n");
if (connect(sd,(struct sockaddr*)&rem_ind, sizeof(struct
sockaddr))<0)
{perror ("error in connect");
exit(3);
}
printf("buff e' %s\n",buff);
if ((write(sd,buff,6))<0)
{perror ("error in write");
exit(3);
}
/*fflush(f);*/
printf("ho scritto a %s\n",ws);
if ((read(sd,buff,6))<0)
{perror ("error in read");
exit(3);
}
printf("ho letto da %s\n",ws);
printf("buff %s\n",buff);
sscanf(buff,"%d",&p1);
if (write(sd,buff,6)<0)
{perror ("error in write");
exit(3);
}
/*fflush(f);*/
printf("scrit 2\n");
shutdown(sd,1);
if (read(sd,buff,6)<0)
{perror ("error in read");
exit(3);
}
printf("buff %s\n",buff);
sscanf(buff,"%d",&m1);
printf("buff %d\n",m1);
close(sd);
/*connessione al nodo di dex*/
memset ((char*)&rem_ind,0,sizeof(struct sockaddr_in));
switch (i) {
case 5: i=8;
break;
case 10: i=7;
break;
case 9: i=5;
break;
default: i=i+2;
}
printf("i vale %d\n",i);
if (i==10) sprintf(ws,"lia%d",i);
else sprintf(ws,"lia0%d",i);
printf("ws e' %s\n",ws);
host=gethostbyname(ws); /*ws e' ilnome della workstation
che verra' di volta in volta passato*/
if (host==NULL) {perror("host not found");
exit(2);
}
rem_ind.sin_family=AF_INET;
rem_ind.sin_addr.s_addr=((struct in_addr*)
(host->h_addr))->s_addr;
rem_ind.sin_port=porta_media; /*porta e' una variabile alla quale
associamo la porta corretta del servizio*/
if((sd=socket(AF_INET, SOCK_STREAM, 0))<0)
{perror ("error in socket");
exit(3);
}
printf ("mi sto per connetere\n");
if (connect(sd,(struct sockaddr*)&rem_ind,sizeof(struct
sockaddr))<0)
{perror ("error in connect");
exit(3);
}
printf("buff e' %s\n",buff);
if (write(sd,buff,80)<0)
{perror ("error in write");
exit(3);
}
printf("ho scritto a %s\n",ws);
if (read(sd,buff,5)<0)
{perror ("error in read");
exit(3);
}
printf("ho letto da %s\n",ws);
sscanf(buff,"%d",&p2);
printf("buff e' %s\n",buff);
if (write(sd,buff,80)<0)
{perror ("error in write");
exit(3);
}
shutdown(sd,1);
if (read(sd,buff,5)<0)
{perror ("error in read");
exit(3);
}
sscanf(buff,"%d",&m2);
close(sd);
/*calcolo della media*/
p=processi_attivi();
mp=(p1+p2+p)/3;
m=med;
mm=(m+m1+m2)/3;
mf=(mp+mm)/2;
return (mf);
printf("media %d \n",mf);
}
/* procedura che da le medie agli altri nodi */
void fornisci_media(int *med)
{
int sd,ns,fromlen,prc,md,fr;
char *buff;
struct hostent *host;
struct sockaddr_in rem_ind,mio_ind;
FILE *f;
memset ((char*)&rem_ind,0,sizeof(struct sockaddr_in));
memset ((char*)&mio_ind,0,sizeof(struct sockaddr_in));
buff=(char * ) malloc (80);
fromlen=sizeof(struct sockaddr_in);
sd=socket(AF_INET,SOCK_STREAM,0);
/*setsockopt(sd,SOL_SOCKET,SO_RCVBUF,&bs,sizeof(bs));*/
mio_ind.sin_family=AF_INET;
mio_ind.sin_port=porta_media;
if (bind(sd,(struct sockaddr*)&mio_ind,sizeof(struct
sockaddr_in))<0)
{perror ("error in bind");
exit(3);
}
listen(sd,5);
for ever
{
if ((ns=accept(sd,(struct
sockaddr*)&rem_ind,&fromlen))<0)
continue;
if ((fr=fork())==0) /*figlio*/
{
printf("fr %d\n",fr);
close(sd);
f=fdopen(ns,"r+");/*ricavo il formato giusto per la
flush*/
printf("ns %d\n",ns);
printf("prima di read in for media\n");
if (read(ns,buff,2)<0)
{perror ("error in read");
exit(3);
}
printf("dopo read in for media\n");
prc=processi_attivi();
printf("prc=%d\n",prc);
sprintf(buff,"%d",prc);
if (write(ns,buff,6)<0)
{perror ("error in write");
exit(3);
}
/*fflush(f);*/
if (read(ns,buff,5)<0)
{perror ("error in read");
exit(3);
}
md=*med;
printf("med=%d\n",md);
sprintf(buff,"%d",md);
shutdown(ns,0);
if (write(ns,buff,6)<0)
{perror ("error in write");
exit(3);
}
fflush(f);
printf("ho scritto la media\n");
close(ns);
} /*fine figlio*/
close(ns);
} /*fine for*/
close(sd);
} /* fine procedura*/
/*Procedura di migrazione per nodo CARICO*/
void n_carico(char *ws)
{
int i,sd,pid,pid1,pid2,c,fr;
int p1[2];
char *risposta;
char *programma;
char buff[80];
char buff1[80];
char spid[6];
struct hostent *host;
struct sockaddr_in rem_ind;
FILE *f;
sscanf(ws,"lia%d",&i);
while ((c<=max_nodi)and(risposta!="y"))
{
printf("i vale %d\n",i);
if (i==10) sprintf(ws,"lia%d",i);
else sprintf(ws,"lia0%d",i);
printf("ws e' %s\n",ws);
memset ((char*)&rem_ind,0,sizeof(struct sockaddr_in));
risposta=(char *) malloc (80);
programma=(char *) malloc (80);
host=gethostbyname(ws); /*ws e' ilnome della workstation
che verra' di volta in volta passato*/
if (host==NULL) {perror("host not found");
exit(2);
}
rem_ind.sin_family=AF_INET;
rem_ind.sin_addr.s_addr=((struct in_addr*)
(host->h_addr))->s_addr;
rem_ind.sin_port=porta_scarico; /*porta e' una variabile alla
quale
associamo la porta corretta del servizio*/
sd=socket(AF_INET,SOCK_STREAM,0);
f=fdopen(sd,"r+");/*ricavo il formato giusto per la
flush*/
if (connect(sd,(struct sockaddr *)&rem_ind,sizeof(struct
sockaddr))<0)
{perror ("error in connect");
exit(3);
}
if (write(sd,"carico",7)<0)
{perror ("error in write");
exit(3);
}
fflush(f);
if (read(sd,risposta,2)<0)
{perror ("error in read");
exit(3);
}
if (risposta!="y") close(sd);
printf("risposta a carico %s\n",risposta);
switch (i) {
case 5: i=7;
break;
case 10: i=5;
break;
default: ++i;
}
++c;/*conta il num di connessioni*/
}/*end while*/
if (risposta=="y") /* nodo scarico trovato*/
{/*bisogna associare il nome del programma al pid corretto
per fare cio' si deve usare un figlio ed una pipe*/
pipe (p1);
if ((fr=fork())==0) /*figlio che esegue la ps di unix per avere
il pid*/
{ close(1); /*std out*/
dup(p1[1]);
close(p1[1]); close(p1[0]);
execl("/bin/ps","ps","-ac",(char*)0);
exit(4);
} /*fine figlio*/
close(p1[1]);
pid2=0;pid1=0;
while (read(p1[0],buff,80)>0) { /* uccido il penultimo pid,
l'ultimo e' il ps e leggo finche il ps mi restituisce qualcosa*/
for (i=0; i<6; i++)
spid[i]=buff[i]; /*isola il pid dalla stringa*/
sscanf(spid,"%d",&pid);
for (i=30; i<36; i++)
buff1[i]=buff[i]; /*isola il nome del programma*/
if (pid>pid2) pid2=pid;
if ((pid<pid2)and(pid>pid1)) { pid1=pid;
programma=buff1;
}
} /*end while input*/
close(p1[0]);
kill(pid1,9);
shutdown (sd,0);
if (write(sd,programma,strlen(programma))<0)
{perror ("error in write");
exit(3);
}
close(sd);
} /*end if*/
}/* fine procedura*/
/*Procedura nodo scarico(tipo server)*/
void n_scarico(int stat)
{
int i,sd,ns,fromlen,fr;
char *richiesta;
char *programma;
struct hostent *host;
struct sockaddr_in rem_ind,mio_ind;
FILE *f;
memset ((char*)&rem_ind,0,sizeof(struct sockaddr_in));
memset ((char*)&mio_ind,0,sizeof(struct sockaddr_in));
richiesta=(char *) malloc (80);
programma=(char *) malloc (80);
fromlen=sizeof(struct sockaddr_in);
sd=socket(AF_INET,SOCK_STREAM,0);
mio_ind.sin_family=AF_INET;
mio_ind.sin_port=porta_scarico;
if (bind(sd,(struct sockaddr*)&mio_ind,sizeof(struct
sockaddr_in))<0)
{perror ("error in bind");
exit(3);
}
listen(sd,5);
for ever
{
if ((ns=accept(sd,(struct
sockaddr*)&rem_ind,&fromlen))<0)
continue;
if ((fr=fork())==0) {
/*figlio*/
close(sd);
f=fdopen(ns,"r+");/*ricavo il formato giusto per la
flush*/
if (read(ns,richiesta,10)<0)
{perror ("error in read");
exit(3);
}
if (stato(stat)==scarico)
{
if (write(ns,"y",2)<0)
{perror ("error in write");
exit(3);
}
/*fflush(f);*/
shutdown(ns,1);
if (read(ns,programma,10)<0)
{perror ("error in read");
exit(3);
}
printf("programma in scarico %s\n",programma);
execle(programma,(char*)0);
}/* end if di stato*/
else {
shutdown(ns,0);
if (write(ns,"n",2)<0) /*comunichiamo che il nodo
non e' scarico*/
{perror ("error in write");
exit(3);
}
/*fflush(f);*/
}/*end else*/
close(ns);
} /*end figlio*/
} /*end forever*/
} /*fine procedura*/
/*procedura nodo molto scarico: si comporta come un
client che va alla ricerca di un nodo molto carico*/
void n_molto_scarico(char *ws)
{
int i,sd,c;
char *risposta;
char *programma;
struct hostent *host;
struct sockaddr_in rem_ind;
FILE *f;
sscanf(ws,"lia%d",&i);
while ((c<=max_nodi)and(risposta!="y"))
/*collegamento alla ricerca di un nodo scarico*/
{
printf("i vale %d\n",i);
if (i==10) sprintf(ws,"lia%d",i);
else sprintf(ws,"lia0%d",i);
printf("ws e' %s\n",ws);
memset ((char*)&rem_ind,0,sizeof(struct sockaddr_in));
risposta=(char *) malloc (80);
programma=(char *) malloc (80);
host=gethostbyname(ws); /*ws e' ilnome della workstation
che verra' di volta in volta passato*/
if (host==NULL) {perror("host not found");
exit(2);
}
rem_ind.sin_family=AF_INET;
rem_ind.sin_addr.s_addr=((struct in_addr*)
(host->h_addr))->s_addr;
rem_ind.sin_port=porta_mcarico; /*porta e' una variabile alla
quale
associamo la porta corretta del servizio*/
sd=socket(AF_INET,SOCK_STREAM,0);
f=fdopen(sd,"r+");/*ricavo il formato giusto per la
flush*/
if (connect(sd,(struct sockaddr *)&rem_ind,sizeof(struct
sockaddr))<0)
{perror ("error in connect");
exit(3);
}
if (write(sd,"mtscarico",7)<0)
{perror ("error in write");
exit(3);
}
fflush(f);
if (read(sd,risposta,2)<0)
{perror ("error in read");
exit(3);
}
if (risposta!="y") close(sd);
printf("risposta in molscarico %s\n",risposta);
switch (i) {
case 5: i=7;
break;
case 10: i=5;
break;
default: ++i;
}
++c;
}/*end while*/
if (risposta=="y") /* nodo molto carico trovato*/
{
if (write(sd,"accetto",8)<0)
{perror ("error in write");
exit(3);
}
fflush(f);
if (read(sd,programma,10)<0)
{perror ("error in read");
exit(3);
}
close(sd);
execle(programma,(char*)0);
} /*end if*/
} /*fine procedura*/
/*procedura nodo molto carico: si comporta come un server che
aspetta
qualcuno che lo liberi da un processo*/
void n_molto_carico(int stat)
{
int i,sd,ns,fromlen,pid,pid1,pid2,fr;
int p1[2];
char richiesta;
char *programma;
char *disponibilita;
char buff[80];
char buff1[80];
char spid[6];
struct hostent *host;
struct sockaddr_in rem_ind,mio_ind;
FILE *f;
memset ((char*)&rem_ind,0,sizeof(struct sockaddr_in));
memset ((char*)&mio_ind,0,sizeof(struct sockaddr_in));
disponibilita=(char *) malloc (80);
programma=(char *) malloc (80);
fromlen=sizeof(struct sockaddr_in);
sd=socket(AF_INET,SOCK_STREAM,0);
mio_ind.sin_family=AF_INET;
mio_ind.sin_port=porta_mcarico;
if (bind(sd,(struct sockaddr*)&mio_ind,sizeof(struct
sockaddr_in))<0)
{perror ("error in bind");
exit(3);
}
listen(sd,5);
sigset(SIGCLD,finefiglio);
for ever
{
if ((ns=accept(sd,(struct
sockaddr*)&rem_ind,&fromlen))<0)
continue;
if ((fr=fork())==0) {
/*figlio*/
close(sd);
f=fdopen(ns,"r+");/*ricavo il formato giusto per la
flush*/
if (read(ns,disponibilita,10)<0)
{perror ("error in read");
exit(3);
}
if (stato(stat)==molto_carico)
{
if (write(ns,"y",2)<0)
{perror ("error in write");
exit(3);
}
fflush(f);
if (read(ns,disponibilita,10)<0)
{perror ("error in read");
exit(3);
}
/*troviamo il pid e il processo corrispondente da eliminare*/
pipe (p1);
if ((fr=fork())==0) /*figlio che esegue la ps di unix per avere
il pid*/
{ close(1); /*std out*/
dup(p1[1]);
close(p1[1]); close(p1[0]);
execl("/bin/ps","ps","-ac",(char*)0);
exit(4);
} /*fine figlio*/
close(p1[1]);
pid2=0;pid1=0;
while (read(p1[0],buff,80)>0) { /* uccido il penultimo pid,
l'ultimo e' il ps e leggo finche il ps mi restituisce qualcosa*/
for (i=0; i<6; i++)
spid[i]=buff[i]; /*isola il pid dalla stringa*/
sscanf(spid,"%d",&pid);
for (i=30; i<36; i++)
buff1[i]=buff[i]; /*isola il nome del programma*/
if (pid>pid2) pid2=pid;
if ((pid<pid2)and(pid>pid1)) { pid1=pid;
programma=buff1;
}
} /*end while input*/
close(p1[0]);
kill(pid1,9);
if (write(ns,programma,strlen(programma))<0)
{perror ("error in write");
exit(3);
}
fflush(f);
} /*end if stato ramo then*/
else {
shutdown(ns,0);
if (write(ns,"n",2)<0) /*comunichiamo che il nodo
non e' molto carico*/
{perror ("error in write");
exit(3);
}
fflush(f);
}/*end else*/
close(ns);
} /*end figlio*/
} /*end forever*/
} /*fine procedura*/
main(int argc,char *argv[])
{ /*in argv[1] c'e' il nome della lia e quindi del nodo di
esecuzione*/
char *eseguibile;
char *mialia;
int pid_scar,pid_mltcar,pid_med,processi_prima,processi_ora,m;
int stat, processi, med;
mialia=argv[1];
sigset(SIGCHLD,finefiglio);
/*inizializzazione del nodo*/
inizializza(&stat,&processi,&med);
processi_prima=processi_attivi(); printf("processi_prima
%d\n",processi_prima); /*processi prima ci dice i processi
attivi al precedente passaggio*/
if ((pid_scar=fork())==0) {n_scarico(stat);}
if ((pid_mltcar=fork())==0) {n_molto_carico(stat);}
if ((pid_med=fork())==0) {fornisci_media(&med);}
/*sono processi sempre attivi*/
do /*ciclo do while simile al repeat until*/
{
printf("\n> ");/*scrivi il prompt*/
scanf("%s",eseguibile);
printf ("eseg %s\n",eseguibile);
printf ("sono %s\n",mialia);
if (strcmp(eseguibile,"end")!=0)
{ printf ("eseg %s\n",eseguibile);
if(fork()==0)
execle(eseguibile,(char *)0);
processi_ora=processi_attivi();
if
((processi_prima<(processi_ora-(0.1*processi_ora)))or(processi_prima>(processi_ora+(0.1*processi_ora))))
/*a fronte di una variazione in eccesso o difetto del 10%
ricalcoliamo la media*/
{m=calc_media(mialia,med);
med=m;
}
mialia=argv[1];
if (m<(processi_ora-(0.25*processi_ora)))
{ aggiorna_stato(molto_scarico,&stat);
n_molto_scarico(mialia);
}
mialia=argv[1];
if
((m>=(processi_ora-(0.25*processi_ora)))and(m<(processi_ora-(0.1*processi_ora))))
aggiorna_stato(scarico,&stat);
if
((m>(processi_ora+(0.1*processi_ora)))and(m<(processi_ora+(0.25*processi_ora))))
{ aggiorna_stato(carico,&stat);
n_carico(argv[1]);
}
if (m>(processi_ora+(0.25*processi_ora)))
aggiorna_stato(molto_carico,&stat);
if
((m>=(processi_ora-(0.1*processi_ora)))and(m<=(processi_ora+(0.1*processi_ora))))
aggiorna_stato(equilibrio,&stat);
}
}
while (strcmp(eseguibile,"end")!=0);
kill(pid_scar,9);
kill(pid_mltcar,9);
kill(pid_med,9);
exit(0);
/*fine nodo*/
}
Ipotesi implementative | Soluzione implementativa | Guida al codice | Considerazioni