Il NS è già stato ampiamente trattato sia per il protocollo di elezione, che principalmente determina il comportamento nei confronti degli altri NS, sia per quanto riguarda il ruolo di manager del servizio.La sua implementazione è parallela: a fronte di ogni richiesta di servizio il NS genera un figlio, o quando possibile un thread, a cui demanda la escuzione del servizio.
Il protocollo di elezione non svolge nessuna funzionalità manageriale, ma esclusivamente di supporto gestendo opportunamente le richieste di elezione da parte dei NS che possono presentarsi in qualsiasi istante, anche durante il funzionamento a regime, un crash del NS freddo che richiede una recovery poco impegnativa rispetto al crash del NS caldo nella quale vengono coinvolti anche i STs.
Forse più che in codice, che comunque è riportato, è opportuno dare una descrizione sommaria del comportamento del NS:
Codice.
/* PROTOCOLLO DI ELEZIONE.
PER RAGIONI DI TEMPORIZZAZIONE (discriminazione tra la fase di elezione e la fase di regime, scadenze di time_out) SI FA USO DEI SEGNALI. SICCOME E' ONNIPRESENTE LA CONCORRENZA LA GESTIONE DI TALI SEGNALI E' MOLTO DIFFICILE ED IN QUESTO CASO POCO ELEGANTE ! */ #include <pthread.h> #include "start.h" /*contiene i file di inclusione e le strutture dei semafori*/
void catch_alrm(); void catch_alrm_cold(); void funct_bcast(); void funct_term(); void funct_replies(); void hot_monitor(); void catch_sigUSR1(); void catch_sigUSR1_child_cold(); void freddo(); void time_out(); void* thread_replies(void*); void funct_keepalive_resume(); void st(); void cliente(); void* cliente_thread(void *); void keepalive_funct(); void handler();
struct sockaddr_in baddr_el, /*socket b_cast datagram*/ repaddr, /*socket dgram che riceve le risposte*/ termaddr, /*socket dgram che risponde al terminale*/ staddr, /*socket d' ascolto dei ST*/ claddr; /*socket d' ascolto dei CLIENTS */
pthread_mutex_t indice_mutex_el=PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t indice_mutex_ns=PTHREAD_MUTEX_INITIALIZER;
MSG_EL own,cold,hot,*ptr_vect[2]; MSG_PROBE*ptr_probe; MSG_NS*ptr_ns;
FILE* fddbg;
int num_replies_received,t_out,crash,shmid_hot,shmid_cold,semid,indice, z,init=3,repsock,bsock_el,termsock,msock_hot,msock_cold,s,ls,sd,keepsock; char bufkeep[10],buffer[BUFFERLEN];
pid_t pid_freddo; fd_set read_mask,temp_mask; sigset_t mask;
main(argc,argv) int argc; char* argv[]; { int i,bsock_ns,flag=0; char linkfile[30],file[15]; struct sockaddr_in baddr_ns, /*socket b_cast datagram*/ peeraddr; /*socket peer datagram*/
sigemptyset(&mask); sigaddset(&mask,SIGALRM); sigaddset(&mask,SIGUSR1); sigset(SIGUSR1,catch_sigUSR1); signal(SIGCLD,SIG_IGN); signal(SIGALRM,catch_alrm);
clear_screen();
/*artificio che consente di porre rimedio all' eventuale contemporaneita' dei NS in ELEZIONE*/ memset(linkfile,0,30); sprintf(linkfile,"%sLocKaTo",argv[0]); if(link(argv[0],linkfile)<0) if(errno!=EEXIST) {fprintf(stdout,"%s:Errore di link\n",strerror(errno));exit(0);} else {sleep(3);flag=1;} else sleep(1);
for(i=0;i<2;i++) { ptr_vect[i]=(MSG_EL*)malloc(sizeof(MSG_EL)); memset((MSG_EL*)ptr_vect[i],0,sizeof(MSG_EL)); } memset((MSG_EL*)&hot,0,sizeof(MSG_EL)); memset((MSG_EL*)&cold,0,sizeof(MSG_EL)); memset((MSG_EL*)&own,0,sizeof(MSG_EL));
gethostname(own.host_NS,sizeof(own.host_NS)); own.porta_mntr=PORTA_MNTR; own.porta_replies=PORTA_RPLS; time(&own.tempo);
/*protocollo di elezione caldo/freddo del NS: invio di un B_CAST sulla porta 30000 ed attesa di risposta*/ if(connection(&baddr_el,&bsock_el,0,PORTA_BCAST_EL,'B',stderr)) {fprintf(stderr,"errore nella creazione della socket B_CAST\n"); exit(0);} if(sendto(bsock_el,(MSG_EL*)&own,sizeof(MSG_EL),0,(struct sockaddr *)&baddr_el, len)<0) {fprintf(stderr,"fallito l'invio del b_cast alla ricerca di un NS\n"); exit(0);}
if(make_socket(&repaddr,&repsock,own.porta_replies,'D',stderr)) {fprintf(stderr,"fallita la creazione della socket B_CAST d'ascolto\n"); exit(1);} if(make_socket(&baddr_el,&bsock_el,PORTA_BCAST_EL,'D',stderr)) {fprintf(stderr,"fallita la creazione della socket B_CAST d'ascolto\n"); exit(1);}
FD_ZERO(&read_mask); FD_SET(bsock_el,&read_mask); FD_SET(repsock,&read_mask); temp_mask=read_mask;
alarm(10);
if(!flag) /*nel caso il processo abbia effettuato il link lo elimina*/ if(unlink(linkfile)<0) {fprintf(stdout,"%s:ATTENZIONE unlink di %s fallito\n", strerror(errno),argv[0]);exit(0);}
fprintf(stderr,"\t\t\nSONO SOSPESO IN FASE DI ELEZIONE...\n"); do { select(repsock+1,&read_mask,NULL,NULL,NULL); if(FD_ISSET(repsock,&read_mask)) funct_replies(); if(FD_ISSET(bsock_el,&read_mask)) funct_bcast(); read_mask=temp_mask; } while(!t_out);
/*teminata la fase di elezione il NS può 'sopravvivere' come hot o cold*/
switch (own.stato_NS) { case 0: exit(0);
case 1: fprintf(stderr,"\tsono il NS di MIRRORING: il mio stato e %d. \n", own.stato_NS);
fprintf(stderr,"\tmi dissocio dal terminale\n"); sprintf(file,"nsdebug%d",own.stato_NS); fddbg=fopen(file,"w+"); demone(); freddo();break;
default: fprintf(stdout,"\n\tsono il NS EFFETTIVO: il mio stato e %d.\n", own.stato_NS);
fprintf(stderr,"\tmi dissocio dal terminale\n"); demone(); /*DEFINIZIONE del segmento di memoria condivisa*/ shmid_hot=shmget(SHMKEY_TAB_NS,2*sizeof(TUPLA),PERM|IPC_CREAT); ptr_tab_hot=(TUPLA(*)[2])shmat(shmid_hot,(TUPLA(*)[2])0,0); memset((TUPLA(*)[2])ptr_tab_hot,0,sizeof(TUPLA[2]));
semid=semget(SEMKEY,2,PERM|IPC_CREAT|IPC_EXCL); if(semid<0 && errno!=EEXIST) fprintf(fddbg,"%s: semget\n",strerror(errno)); else semid=semget(SEMKEY,0,0);
}
/*Questa parte di codice è eseguita soltanto dal NS EFFETTIVO*/
sprintf(file,"nsdebug%d",own.stato_NS); fddbg=fopen(file,"w+");
ptr_ns=(MSG_NS*)malloc(sizeof(MSG_NS)); memset((MSG_NS*)ptr_ns,0,sizeof(MSG_NS)); (*ptr_ns).porta_dg_cl=PORTA_DG_CL; (*ptr_ns).porta_ls_st=PORTA_LS_ST;
/*inizializza la tabella di stato*/ semop(semid,&lock_ns[0],2); for(i=0;i<2;i++) (*ptr_tab_hot)[i].porta_ka=32300+(unsigned short)i; semop(semid,&unlock_ns[0],1);
/*assegna la struttura d'indirizzo per: - la sock ST di listen di richieste di registrazione dei ST. - la sock DG di comunicazione con i CLIENTS. - la sock DG BROADCAST. - ls sock DG di comunicazione col TERMINALE.*/ if(make_socket(&staddr,&ls,PORTA_LS_ST,'S',fddbg)) {fprintf(fddbg,"fallita la creazione della socket di listen dei ST\n"); exit(1);} if(make_socket(&claddr,&sd,PORTA_DG_CL,'D',fddbg)) {fprintf(fddbg,"fallita la creazione della socket DGRAM dei CLIENTS\n"); exit(1);} if(make_socket(&baddr_ns,&bsock_ns,PORTA_BCAST_NS,'D',fddbg)) {fprintf(fddbg,"fallita la creazione della socket DGRAM B_CAST\n"); exit(1);} if(make_socket(&termaddr,&termsock,PORTA_TERM,'D',fddbg)) {fprintf(fddbg,"fallita la creazione della socket per il term\n"); fflush(fddbg); exit(1);}
FD_ZERO(&read_mask); FD_SET(bsock_el,&read_mask); FD_SET(ls,&read_mask); FD_SET(sd,&read_mask); FD_SET(bsock_ns,&read_mask); FD_SET(termsock,&read_mask); temp_mask=read_mask;
for(;;) { select(termsock+1,&read_mask,NULL,NULL,NULL); if(FD_ISSET(bsock_el,&read_mask)) funct_bcast(); if(FD_ISSET(sd,&read_mask)) cliente(); if(FD_ISSET(ls,&read_mask)) st(); if(FD_ISSET(termsock,&read_mask)) funct_term(); if(FD_ISSET(bsock_ns,&read_mask)) if(crash) crash=0; else { if(recvfrom(bsock_ns,buffer,BUFFERLEN,0,(struct sockaddr*)&peeraddr, &len)<0) fprintf(fddbg,"%s: recvfrom B_CAST\n",strerror(errno)); if(sendto(bsock_ns,ptr_ns,sizeof(MSG_NS),0,(struct sockaddr*)&peeraddr, len)<0) fprintf(fddbg,"%s: sendto B_CAST\n",strerror(errno)); } read_mask=temp_mask; } }
/********************************************************************************/ /* funzione che gestisce la ricezione delle risposte al BCAST. */ /* */ /* NB: il NS è ancora in fase di elezione quando riceve le risposte */ /* al Bcast dunque non è ancora un demone. */ /********************************************************************************/ void funct_replies() { pthread_t tid; MSG_EL *ptr_temp;
sigprocmask(SIG_BLOCK,&mask,NULL);
ptr_temp=(MSG_EL*)malloc(sizeof(MSG_EL)); memset((MSG_EL*)ptr_temp,0,sizeof(MSG_EL));
fprintf(stderr,"\nricevuta una risposta al BCAST\n"); if(recvfrom(repsock,ptr_temp,sizeof(MSG_EL),0,(struct sockaddr *)0,(int*)0)<0) {fprintf(stderr,"%s:fallita la ricezione del b_cast di elezione\n", strerror(errno));return;}
fprintf(stderr,"\tRicevuto il seguente messaggio:\n"); fprintf(stderr,"\t\tCampo host_ NS: %s\n",(*ptr_temp).host_NS); fprintf(stderr,"\t\tCampo stato_ NS: %d\n",(*ptr_temp).stato_NS); fprintf(stderr,"\t\tCampo porta_mntr: %d\n",(*ptr_temp).porta_mntr); fprintf(stderr,"\t\tCampo porta_replies: %d\n",(*ptr_temp).porta_replies); fprintf(stderr,"\t\tCampo tempo: %d\n",(*ptr_temp).tempo);
pthread_create(&tid,NULL,&thread_replies,ptr_temp);
sigprocmask(SIG_UNBLOCK,&mask,NULL); }
/*****************************************************************************/ /* thread_replies (): codice eseguito dal thread */ /* */ /* Il codice viene eseguito con la maschera dei segnali BLOCCATA */ /*****************************************************************************/ void* thread_replies(void* ptr_thread) { pthread_detach(pthread_self());
fprintf(stderr,"\tTHREAD di BCAST:gestisco la risposta\n");
pthread_mutex_lock(&indice_mutex_el); init-=((MSG_EL*)ptr_thread)->stato_NS;
switch(((MSG_EL*)ptr_thread)->stato_NS) { case 3: fprintf(stderr,"\tRisposta ottenuta dal NS EFFETTIVO \"riflesso\".\n"); break; case 2: fprintf(stderr,"\tRisposta ottenuta dal NS EFFETTIVO.\n"); break; case 1: fprintf(stderr,"\tRisposta ottenuta dal NS MIRROR.\n"); break; }
fprintf(stderr,"Il numero di risposte ricevute è: %d\n\n", num_replies_received+1);
switch (num_replies_received) { case 0: put_into(ptr_vect[0],(MSG_EL*)ptr_thread); num_replies_received++;break;
case 1: if(((MSG_EL*)ptr_thread)->tempo < ptr_vect[0]->tempo) { put_into(ptr_vect[1],ptr_vect[0]); put_into(ptr_vect[0],(MSG_EL*)ptr_thread); } else put_into(ptr_vect[1],(MSG_EL*)ptr_thread);
num_replies_received++;break;
default: if(((MSG_EL*)ptr_thread)->tempo < ptr_vect[0]->tempo) { put_into(ptr_vect[1],ptr_vect[0]); put_into(ptr_vect[0],(MSG_EL*)ptr_thread); } else if(((MSG_EL*)ptr_thread)->tempo < ptr_vect[1]->tempo) put_into(ptr_vect[1],(MSG_EL*)ptr_thread);
num_replies_received++; }
pthread_mutex_unlock(&indice_mutex_el);
free((MSG_EL*)ptr_thread); pthread_exit(NULL); }
/*****************************************************************************/ /* funzione che riceve i BCASTs inviati dagli altri NS */ /*****************************************************************************/ void funct_bcast() { int s; struct sockaddr_in peeraddr; MSG_EL *ptr_temp; pthread_t tid; FILE* stream;
/*Questo controllo serve per impedire che il NS CALDO si sospenda erroneamente in attesa di ricezione di un Bcast che non è stato inviato. Questo funzionamento anomalo si ha quando il figlio del NS CALDO di accorge che il NS FREDDO è in crash ed invia un segnale al NS CALDO il quale in questo caso è sospeso sulla select.Sembra che il segnale ricevuto dal NS CALDO abbia l' effetto di "settare tutti i bits della maschera della SELECT*/
if (crash) return;
/*Se si è a regime si deve ridirigere su file*/ if(!t_out) stream=stdout; else stream=fddbg;
sigprocmask(SIG_BLOCK,&mask,NULL);
ptr_temp=(MSG_EL*)malloc(sizeof(MSG_EL)); memset((MSG_EL*)ptr_temp,0,sizeof(MSG_EL));
fprintf(stream,"\nricevuto un BCAST..."); if(recvfrom(bsock_el,ptr_temp,sizeof(MSG_EL),0,(struct sockaddr *)0,(int*)0)<0) {fprintf(stream,"%s:fallita la ricezione del b_cast di elezione\n", strerror(errno));fflush(stream);return;}
if(connection(&peeraddr,&s,ptr_temp->host_NS,ptr_temp->porta_replies, 'D',stream)) {fprintf(stream,"ATTENZIONE: errore nella creazione della socket di \ risposta alB_CAST\n");fflush(stream);}
sleep(1);
if(sendto(s,(MSG_EL*)&own,sizeof(MSG_EL),0,(struct sockaddr *)&peeraddr,len)<0) {fprintf(stream,"fallito l'invio del b_cast alla ricerca di un NS\n"); fflush(stream);}
fprintf(stream,"Il valore dello stato è %d inviata la risposta.\n", own.stato_NS);fflush(stream);
/*Il valore 5 identifica il terminale che non deve essere scambiato come un NS*/ if(ptr_temp->stato_NS!=5 && own.stato_NS==CALDO) { fprintf(stream,"CALDO: rispondo al primo BCAST\n");fflush(stream); put_into(&cold,ptr_temp); own.stato_NS=REFLEX; if(fork()==0) hot_monitor(); }
sigprocmask(SIG_UNBLOCK,&mask,NULL); close(s);free(ptr_temp);
}
/******************************************************************************/ /* catch di SIGALRM del NS in elezione */ /******************************************************************************/ void catch_alrm() { int i; pthread_t tid;
t_out++;
if(init==3) /*hanno risposto al BCAST di elezione soltanto NS in elezione*/ switch (num_replies_received) { case 0: own.stato_NS=CALDO; break;
case 1: if(own.tempo < ptr_vect[0]->tempo) { own.stato_NS=REFLEX; put_into(&cold,ptr_vect[0]); if(fork()==0) hot_monitor(); } else { own.stato_NS=FREDDO; put_into(&hot,ptr_vect[0]); } break;
default: /*ha ricevuto 2 o più risposte: occorre controllare se è presente quella del proprio figlio*/ if(own.tempo < ptr_vect[0]->tempo) { put_into(&cold,ptr_vect[0]); own.stato_NS=REFLEX; if(fork()==0) hot_monitor(); } else if(own.tempo < ptr_vect[1]->tempo) { own.stato_NS=FREDDO; put_into(&hot,ptr_vect[0]); } } else /*ha risposto uno o più NS a regime*/ switch(init) { case 1: own.stato_NS=FREDDO; put_into(&hot,ptr_vect[0]); break;
case 2: fprintf(stdout,"ATTENZIONE fase TRANSITORIA del sistema\n\n"); exit(0);break;
default: if(num_replies_received==1) fprintf(stdout,"ATTENZIONE fase TRANSITORIA del sistema\n\n"); else fprintf(stdout,"\n\t\tSISTEMA a REGIME\n\n\n"); }
for(i=0;i<2;i++) free(ptr_vect[i]);
FD_CLR(repsock,&read_mask); FD_CLR(bsock_el,&read_mask); }
/******************************************************************************/ /* hot_monitor() : codice eseguito dal figlio del NS EFFETTIVO */ /******************************************************************************/ void hot_monitor() { struct sockaddr_in addr_cold,addr_hot; int i; char probe[10]; FILE* stream;
sigprocmask(SIG_UNBLOCK,&mask,NULL); /*E' fondamentale sbloccare la maschera in quanto il "figlio di monitor" può essere generato quando il padre (NS CALDO) ha blockato il segnale di SIGALRM. Tale blocco modifica la maschera dei segnali del padre che viene ereditata dal figlio. La sigprocmask() serve appunto per ripristinare la maschera del figlio nella comdizione di nessun segnale bloccato.*/
/*Se si è a regime si deve ridirigere su file*/ if(!t_out) stream=stdout; else stream=fddbg;
fprintf(stream,"\n\tFIGLIO: Mi connetto al NS DI MIRRORING\n\n");fflush(stream); signal(SIGALRM,time_out);
if(connection(&addr_cold,&msock_cold,cold.host_NS,\ cold.porta_mntr,'D',stream)) {fprintf(stream,"Errore nella connessione al NS caldo\n");fflush(stream); return;}
if(make_socket(&addr_hot,&msock_hot,own.porta_mntr,'D',stream)) {fprintf(stream,"fallita la creazione della socket B_CAST d'ascolto\n"); fflush(stream);return;}
alarm(15);
for(;;) { if(recvfrom(msock_hot,probe,10,0,(struct sockaddr *)0,(int*)0)<0) {fprintf(stream,"%s: Errore di ricezione del PROBE dal NS FREDDO\n", strerror(errno));fflush(stream);}
semop(semid,&lock_ns[0],2); if(sendto(msock_cold,ptr_tab_hot,sizeof(TUPLA[2]),0, (struct sockaddr *)&addr_cold,len)<0) {fprintf(stream,"fallito l'invio dello stato di monitor\n");fflush(stream);} semop(semid,&unlock_ns[0],1);
alarm(15); } }
/******************************************************************************/ /* time_out(): codice eseguito dalla FIGLIO in caso di crash del NS FREDDO */ /******************************************************************************/ void time_out() { kill(getppid(),SIGUSR1); close(msock_hot);close(msock_cold); exit(0); }
/******************************************************************************/ /* catch_sigUSR1(): handler di SIGUSR1 eseguito dal padre */ /******************************************************************************/ void catch_sigUSR1() { own.stato_NS=CALDO;crash=1; fprintf(fddbg,"\n\tATTENZIONE: Crash del NS FREDDO. \ Termino la comunicazione!\n");fflush(fddbg); }
/******************************************************************************/ /* NS freddo */ /******************************************************************************/ void freddo() { struct sockaddr_in addr_cold,addr_hot; int i; char probe[10];
/*DEFINIZIONE del segmento di memoria condivisa*/ shmid_cold=shmget(SHMKEY_TAB_COLD,2*sizeof(TUPLA),PERM|IPC_CREAT); ptr_tab_cold=(TUPLA(*)[2])shmat(shmid_cold,(TUPLA(*)[2])0,0); semid=semget(SEMKEY,0,0);
/*Quando un NS viene eletto come MIRROR non esce da questa funzione finchè non diviene effettivo. In questa funzione il padre si occupa della funzionalità di mirroring mentre il figlio risponde ai Bcasts e alle richieste del terminale*/
if((pid_freddo=fork())==0) /* codice eseguito dalla FIGLIO*/ { signal(SIGUSR1,catch_sigUSR1_child_cold);
if(make_socket(&termaddr,&termsock,PORTA_TERM,'D',fddbg)) {fprintf(fddbg,"fallita la creazione della socket B_CAST d'ascolto\n"); fflush(fddbg); exit(1);}
FD_ZERO(&read_mask); FD_SET(bsock_el,&read_mask); FD_SET(termsock,&read_mask); temp_mask=read_mask;
for(;;) { select(termsock+1,&read_mask,NULL,NULL,NULL); if(FD_ISSET(bsock_el,&read_mask)) funct_bcast(); if(FD_ISSET(termsock,&read_mask)) funct_term(); read_mask=temp_mask; } } else /*codice del PADRE*/ { signal(SIGALRM,catch_alrm_cold);
if(connection(&addr_hot,&msock_hot,hot.host_NS,hot.porta_mntr, 'D',fddbg)) {fprintf(fddbg,"Errore nella connessione al NS caldo\n");fflush(fddbg); return;}
if(make_socket(&addr_cold,&msock_cold,own.porta_mntr,'D',fddbg)) {fprintf(fddbg,"fallita la creazione della socket B_CAST d'ascolto\n"); fflush(fddbg);return;}
alarm(15); for(;;) { sleep(5); if(sendto(msock_hot,probe,10,0,(struct sockaddr *)&addr_hot,len)<0) {fprintf(fddbg,"NS FREDDO fallito l'invio del probe\n");fflush(fddbg);}
semop(semid,&lock_cold[0],2); if(recvfrom(msock_cold,ptr_tab_cold,sizeof(TUPLA[2]),0, (struct sockaddr *)0,(int*)0)<0 && errno!=EINTR) {fprintf(fddbg,"fallita la ricezione dello stato come monitor: %s\n", strerror(errno));fflush(fddbg);} semop(semid,&unlock_cold[0],1);
if(own.stato_NS==CALDO) { semop(semid,&lock_cold[0],2); semop(semid,&lock_ns[0],2); for(i=0;i<2;i++) { (*ptr_tab_hot)[i].stato=(*ptr_tab_cold)[i].stato; strcpy((*ptr_tab_hot)[i].host,(*ptr_tab_cold)[i].host); strcpy((*ptr_tab_hot)[i].porta_cl,(*ptr_tab_cold)[i].porta_cl); strcpy((*ptr_tab_hot)[i].porta_ud,(*ptr_tab_cold)[i].porta_ud); strcpy((*ptr_tab_hot)[i].porta_res,(*ptr_tab_cold)[i].porta_res); (*ptr_tab_hot)[i].porta_ka=(*ptr_tab_cold)[i].porta_ka; } semop(semid,&unlock_cold[0],1); shmdt((char *)ptr_tab_cold); shmctl(shmid_cold,IPC_RMID,(struct shmid_ds *)0); semop(semid,&unlock_ns[0],1);
funct_keepalive_resume(); return; } alarm(15); } } }
/******************************************************************************/ /* Handler di SIGUSR1 del FIGLIO del NS freddo */ /******************************************************************************/ void catch_sigUSR1_child_cold() {exit(0);}
/******************************************************************************/ /* Handler di SIGALRM del NS freddo */ /******************************************************************************/ void catch_alrm_cold() { semop(semid,&unlock_cold[0],1);
kill(pid_freddo,SIGUSR1); waitpid(pid_freddo,0,0);
own.stato_NS=CALDO; close(msock_hot);close(msock_cold); fprintf(fddbg,"\nChrash del NS EFFETTIVO. Mi \"autoeleggo\" EFFETTIVO. \ Il mio stato e %d.\n",own.stato_NS);fflush(fddbg);
/*DEFINIZIONE del segmento di memoria condivisa*/ shmid_hot=shmget(SHMKEY_TAB_NS,2*sizeof(TUPLA),PERM|IPC_CREAT); ptr_tab_hot=(TUPLA(*)[2])shmat(shmid_hot,(TUPLA(*)[2])0,0); memset((TUPLA(*)[2])ptr_tab_hot,0,sizeof(TUPLA[2])); semop(semid,&lock_cold[0],2); }
/******************************************************************************/ /* funzione che risponde alla richiesta del terminale */ /******************************************************************************/ void funct_term() { struct sockaddr_in peeraddr; char buf[10];
if(crash) return;
sigprocmask(SIG_BLOCK,&mask,NULL);
if(recvfrom(termsock,buf,10,0,(struct sockaddr *)&peeraddr,&len)<0) {fprintf(fddbg,"fallita la ricezione dello stato come monitor: %s\n", strerror(errno));fflush(fddbg);}
if(own.stato_NS > 1) /*risponde alla richiesta il NS EFFETTIVO*/ { semop(semid,&lock_ns[0],2); if(sendto(termsock,ptr_tab_hot,sizeof(TUPLA[2]),0, (struct sockaddr *)&peeraddr,len)<0) {fprintf(fddbg,"fallito l'invio dello stato di monitor\n"); fflush(fddbg);} semop(semid,&unlock_ns[0],1); } else /*risponde alla richiesta il NS di MIRROR*/ { semop(semid,&lock_cold[0],2); if(sendto(termsock,(TUPLA(*)[2])ptr_tab_cold,sizeof(TUPLA[2]),0, (struct sockaddr *)&peeraddr,len)<0) {fprintf(fddbg,"fallito l'invio dello stato di monitor\n");fflush(fddbg);} semop(semid,&unlock_cold[0],1); }
sigprocmask(SIG_UNBLOCK,&mask,NULL); }
/****************************************************************************/ /* st() */ /****************************************************************************/
/* questa funzione si occupa della gestione delle richieste di registrazione dei st. Riceve un messaggio del tipo: <host ST> <porta_cl> <porta_ud> char[24] u_short u_short */
void st() { struct sockaddr_in peeraddr; /* e' la socket associata al peer */ struct hostent* hp; char *host_ST,buf[BUFLEN],buffer[BUFFERLEN];
int nf,lenght;
if(crash) return;
sigprocmask(SIG_BLOCK,&mask,NULL);
memset(buffer,0,BUFFERLEN);memset(buf,0,BUFLEN);
/*accetta una richieta di connesssione da perte di un ST e cerca informazioni sul suo indirizzo internet*/ if((s=accept(ls,(struct sockaddr *)&peeraddr,&len))==-1) {fprintf(fddbg,"accept ls");return;} lenght=ntohl(peeraddr.sin_addr.s_addr); hp=gethostbyaddr((char*)&lenght,sizeof(struct in_addr),peeraddr.sin_family); if(hp==NULL) host_ST=(char *)inet_ntoa(peeraddr.sin_addr); else host_ST=hp->h_name; fprintf(fddbg,"\n\nRichiesta di connessione dal nodo %s ;porta %u \n", host_ST,ntohs(staddr.sin_port)); fflush(fddbg);
if(fork()==0) /*codice del FIGLIO*/ { close(ls); if(recv(s,buf,BUFLEN,0)<BUFLEN) {fprintf(fddbg,"%s: errore di ricezione\n",strerror(errno));exit(0);}
/*controlla lo stato del servizio: z=2 NON è possibile registrare il ST*/ semop(semid,&lock_ns[0],2); for(z=0;z<2;z++) if(!(*ptr_tab_hot)[z].stato) break; semop(semid,&unlock_ns[0],1);
fprintf(fddbg,"NS: Il volore di z che viene utilizzato dal FIGLIO e': %d\n", z); fflush(fddbg);
ptr_probe=(MSG_PROBE*)malloc(sizeof(MSG_PROBE)); memset((MSG_PROBE*)ptr_probe,0,sizeof(MSG_PROBE));
if(z==2) { (*ptr_probe).stato=0; if(send(s,ptr_probe,sizeof(MSG_PROBE),0)!=sizeof(MSG_PROBE)) {fprintf(fddbg,"%s: errore di trasmissione\n",strerror(errno)); fflush(fddbg);close(s);} fprintf(fddbg,"NS FIGLIO: Il messaggio inviato e':\n"); fprintf(fddbg,"\t\tstato %d\n",(*ptr_probe).stato); fprintf(fddbg,"\t\tporta_ka %d\n",(*ptr_probe).porta_ka); fflush(fddbg);close(s); exit(0); } else { (*ptr_probe).stato=1; semop(semid,&lock_ns[0],2); (*ptr_probe).porta_ka=(*ptr_tab_hot)[z].porta_ka; semop(semid,&unlock_ns[0],1); if(send(s,ptr_probe,sizeof(MSG_PROBE),0)!=sizeof(MSG_PROBE)) {fprintf(fddbg,"%s: errore di trasmissione\n",strerror(errno)); fflush(fddbg);close(s);exit(0);} fprintf(fddbg,"NS FIGLIO: Il messaggio inviato è :\n"); fprintf(fddbg,"\t\tstato %d\n",(*ptr_probe).stato); fprintf(fddbg,"\t\tporta_ka %d\n",(*ptr_probe).porta_ka); fflush(fddbg);
if(recv(s,buffer,BUFFERLEN,0)!=BUFFERLEN) {fprintf(fddbg,"%s: errore di ricezione\n",strerror(errno)); fflush(fddbg);close(s);exit(0);} close(s);
semop(semid,&lock_ns[0],2); (*ptr_tab_hot)[z].stato=1; strcpy((*ptr_tab_hot)[z].host,host_ST); nf=sscanf(buf," %s %s %s ",(*ptr_tab_hot)[z].porta_cl, (*ptr_tab_hot)[z].porta_ud,(*ptr_tab_hot)[z].porta_res); if(nf!=3) {fprintf(fddbg,"ATTENZIONE: errore di scompattazione dati ST !\n"); fflush(fddbg);} semop(semid,&unlock_ns[0],1);
free(ptr_probe); keepalive_funct(); } } else /*codice del NS*/ { close(s); sigprocmask(SIG_UNBLOCK,&mask,NULL); return; } }
/**************************************************************************/ /* KEEPALIVE: codice eseguito dal figlio. */ /**************************************************************************/ void keepalive_funct() { struct sockaddr_in keepaddr, /*socket di keepalive*/ peeraddr;
sigprocmask(SIG_UNBLOCK,&mask,NULL); signal(SIGALRM,handler);
/*crea la socket D_GRAM di KeepAlive ed attende PROBE dal ST*/
semop(semid,&lock_ns[0],2); if(make_socket(&keepaddr,&keepsock,(*ptr_tab_hot)[z].porta_ka,'D',fddbg)) {fprintf(fddbg,"errore nella creazione della socket D_GRAM di KA\n"); fflush(fddbg);exit(1);} semop(semid,&unlock_ns[0],1);
/*ciclo infinito dal quale il figlio esce solamente in caso di SIGALRM gestito dall'handler*/ alarm(20); for(;;) { if(recvfrom(keepsock,bufkeep,10,0,(struct sockaddr*)0,(int*)0)!=10) fprintf(fddbg,"\n\n\tFIGLIO: errore di ricezione di KA da \n"); alarm(7); } }
/****************************************************************************/ /* KEEPALIVE RESUME: codice eseguito dal NS FREDDO per ripristinare lo */ /* di KeepAlive dei ST ATTIVI. */ /****************************************************************************/ void funct_keepalive_resume() { int i,s; struct sockaddr_in resaddr;
semop(semid,&lock_ns[0],2);
for(z=0;z<2;z++) if((*ptr_tab_hot)[z].stato) if(fork()==0) /*codice del FIGLIO*/ { if(connection(&resaddr,&s,(*ptr_tab_hot)[z].host, atoi((*ptr_tab_hot)[z].porta_res),'S',fddbg)) { fprintf(fddbg,"FIGLIO di RESUME: errore nella connessione di resume.\ Annullo la entry nella tabella e considero il ST corrispondente \"caduto\",\ quindi termino!\n"); memset((TUPLA *)&(*ptr_tab_hot)[z],0,sizeof(TUPLA)); exit(1); } ptr_probe=(MSG_PROBE*)malloc(sizeof(MSG_PROBE)); memset((MSG_PROBE*)ptr_probe,0,sizeof(MSG_PROBE)); (*ptr_probe).stato=1; (*ptr_probe).porta_ka=(*ptr_tab_hot)[z].porta_ka;
if(send(s,ptr_probe,sizeof(MSG_PROBE),0)!=sizeof(MSG_PROBE)) { fprintf(fddbg,"FIGLIO di RESUME: errore nella trasmissione del messaggio di resume. Annullo la entry nella tabella e considero il ST corrispondente \"caduto\" quindi ritorno\n"); memset((TUPLA *)&(*ptr_tab_hot)[z],0,sizeof(TUPLA));return; }
if(recv(s,buffer,BUFFERLEN,0)!=BUFFERLEN) { fprintf(fddbg,"FIGLIO di RESUME: errore nella ricezione del messaggio di pronto. Annullo la entry nella tabella e considero il ST corrispondente \"caduto\" quindi termino!\n"); memset((TUPLA *)&(*ptr_tab_hot)[z],0,sizeof(TUPLA));return; }
close(s);free(ptr_probe); keepalive_funct(); } else continue; semop(semid,&unlock_ns[0],1); }
/****************************************************************************/ /* HANDLER DI SIGALRM */ /****************************************************************************/ void handler() { /*in caso di crash il FIGLIO deve annullare la entry opportuna nella tabella e terminare*/ semop(semid,&lock_ns[0],2);
fprintf(fddbg,"\n\n\tFIGLIO: scaduto il timeout di ricezione : crash di %d\n", (*ptr_tab_hot)[z].porta_ka);
memset((TUPLA *)&(*ptr_tab_hot)[z],0,sizeof(TUPLA)); (*ptr_tab_hot)[z].porta_ka=32300+(unsigned short)z;
semop(semid,&unlock_ns[0],1);
close(keepsock); fprintf(fddbg,"\tFIGLIO: ho chiuso tutte la socket, ora termino\n"); fflush(fddbg);exit(0); }
/****************************************************************************/ /* cliente(): codice eseguito NS */ /****************************************************************************/ void cliente() { pthread_t tid; memset(buffer,0,BUFFERLEN);
if(crash) return;
sigprocmask(SIG_BLOCK,&mask,NULL);
/*il NS riceve dal CLIENTS un messaggio a significato "implicito"*/ if(recvfrom(sd,buffer,BUFFERLEN,0,(struct sockaddr *)&claddr,&len)<0) {fprintf(fddbg,"%s: recvfrom cliente\n",strerror(errno)); fflush(fddbg); return;}
pthread_create(&tid,NULL,&cliente_thread,NULL);
sigprocmask(SIG_UNBLOCK,&mask,NULL); }
/****************************************************************************/ /* cliente_thread(): codice eseguito dal thread */ /****************************************************************************/
/* questa funzione fornisce al cliente l' indirizzo di un st se disponibile. Nel caso di st disponibile il messaggio di risposta e'
<host ST> <porta_cl> 10 10 caratteri (compreso '\0') */
void* cliente_thread(void* ptr_tab_hot_thread) { int flag=1; char buf[BUFLEN];
memset(buf,0,BUFLEN);
pthread_detach(pthread_self());
semop(semid,&lock_ns[0],2); pthread_mutex_lock(&indice_mutex_ns);
/* equa distribuzione del servizio quando pessibile */ if(((*ptr_tab_hot)[0].stato)&&((*ptr_tab_hot)[1].stato)) indice=indice ? 0:1; else if((*ptr_tab_hot)[0].stato) indice=0; else if((*ptr_tab_hot)[1].stato) indice=1; else flag=0;
pthread_mutex_unlock(&indice_mutex_ns); semop(semid,&unlock_ns[0],1);
if(flag) /*flag segnala lo stato del servizio*/ sprintf(buf,"%s %s",(*ptr_tab_hot)[indice].host, (*ptr_tab_hot)[indice].porta_cl); else strcpy(buf,"*"); if((sendto(sd,buf,BUFLEN,0,(struct sockaddr *)&claddr,len))<0) {fprintf(fddbg,"socket datagram\n");fflush(stderr);}
pthread_exit(0); }