//tale thread ha il compito di inoltrare la richiesta agli Slave e di recuperarne i risultati appena diponibili. //a tal punto invochera' la corretta routine di restituzione del risultato al mittente(Client o Slave) import java.io.*; import java.net.*; import java.util.*; //N.B. in caso di errore no exit ma return public class inoltraQuerySlave extends Thread{ private int index; private String stringFull=""; private String richiesta; private String mail; private int numVivi=0; private int attesaLivelli=0; public inoltraQuerySlave(int arg_index, String arg_string, String arg_mail, int arg_numVivi){ super(); numVivi=arg_numVivi;//diverso da zero se la richiesta è stata recuperata in caso di rielezione index = arg_index; Constants.prn(index+"-sono dentro a inoltraQuery versione 5.7"); mail=arg_mail; System.out.println(index+"-per la corrente richiesta mando la porta: " + (ConstantsM.contactSlaveVivi+index)); stringFull = arg_string + "\n" + (ConstantsM.contactSlaveVivi+index);//agli slave devo passare la stringa e lA PORTA SU CUI ATTENDERA' System.out.println(index+"-per la corrente richiesta mando la RICHIESTA: " + stringFull); richiesta = arg_string; int valTtl=Integer.parseInt( richiesta.substring(richiesta.lastIndexOf("\n")+1, richiesta.length())); attesaLivelli=Constants.attesa1liv * (2*valTtl+1); /*graficamente si vede (x il master): n.b. il ttl arriva già decrementato!! 0---->1 ramo da attendere 1---->3 rami da attendere 2---->5 rami da attendere 3---->7 rami da attendere e così via..... graficamente n.rami=1+ttl*2 */ } public void run(){ gestoreMail gm = new gestoreMail(mail); String rispQuery = this.inoltraQuery(); if (rispQuery!="errore"){ System.out.println(index+"-faccio la set nella cache del risultato"); ConstantsM.cm.set(richiesta, rispQuery); } else{ System.out.println(index+"-faccio la remove della richiesta dalla cache"); ConstantsM.cm.remove(richiesta); } //in rispQuery ho file oppure "errore" //mando risultati: in mail ho l'email del client o "SLAVE" if (mail.equals("SLAVE")){ //devo trasferire i file allo slave via UDP e TCP!!! //mandaFileSlave(richiesta); System.out.println(index+"vado in sandSLave"); sendSlave.sendRisultati(rispQuery, richiesta); ConstantsM.numRichiesteSlave--; } else{ int i = gm.sendMail(rispQuery); ConstantsM.numRichiesteClient--; } System.out.println(index+"-pronto per nuove richieste"); } public String inoltraQuery(){ contactSlave cs = new contactSlave(); byte[] data; byte[] buffer; DatagramSocket ds; DatagramPacket input; //manda le richieste e cerca di ottenere gli Ack massimo 3 volte try{ buffer = new byte[65507]; ds = new DatagramSocket(ConstantsM.contactSlaveUDP+index); ds.setSoTimeout(Constants.attesaNRec);//timeout senno' mi blocco x sempre!!! data = stringFull.getBytes(); System.out.println(index+"-attendo 1° ack su: " + (ConstantsM.contactSlaveUDP+index)); } catch (Exception e){ System.out.println(index+"-errore creazione socket dgram per contattare gli Slave: " + e); return "errore"; } //creo già la socket x quelli che sarannio "vivi" ion modo che se rispondono prima che io abbia finito il ciclo dei 3 ack non perdo i messaggi DatagramSocket dsVivi=null; try{ dsVivi = new DatagramSocket(ConstantsM.contactSlaveVivi+index); System.out.println(index+"-attendo 2° ack su: " + (ConstantsM.contactSlaveVivi+index)); } catch (Exception e){ System.out.println(index+"-errore creazione socket dgram per slave 'vivi': " + e); return "errore"; } //se tale variabile e' diversa da zero vuol dire che sono stato chiamato all'atto di una rieleazione //quindi devo solo aspettare le risposte, "salto" al blocco dopo! //sarebbe da rifare con due metodi distinti! if (numVivi==0){ while (cs.isPossibleAck()){ input = new DatagramPacket(buffer, buffer.length); int numSend=0; Enumeration e; for (e = cs.takeSlaves() ; e.hasMoreElements() ;) { KeySlave arg = (KeySlave)e.nextElement(); if (cs.isPossibleAck(arg)){ try{ InetAddress dest = InetAddress.getByName(arg.ipSlave); int porta = arg.portaSlave; DatagramPacket output = new DatagramPacket(data, data.length, dest, porta); System.out.println(index+"-sendo su: " + arg.ipSlave + ", porta " + arg.portaSlave + " il messaggio: " + (new String(data,0,data.length))); ds.send(output); } catch (UnknownHostException uhe){ cs.removeSlave(arg); continue; } catch (IOException ioe){ //c'è stato un errore di io indipendente dalla rete o dallo slave, è un problema mio //riprovero' dopo, al prossimo ciclo while continue; } cs.incTrasm(arg); numSend++; } }//fine for // a questo punto ho mandato le richiesta agli slave vivi, devo prendere gli ack!! if (numSend==0) return "errore"; long prima=0; long dopo =0; long timeRemain = Constants.attesaNRec; //attendo il numero di richieste che ho mandato!!! for (int i=0;i<numSend;i++){ try{ if (dopo>0) timeRemain = timeRemain - (dopo-prima); dopo=0; if (timeRemain<=0) break; ds.setSoTimeout((int)timeRemain);//timeout senno' mi blocco x sempre!!! prima = System.currentTimeMillis(); System.out.println(index+"-receive ack"); ds.receive(input);//se non risponde dopo x msec catch eccezione dopo = System.currentTimeMillis(); String s1 = new String(input.getData(), 0, input.getLength()); System.out.println(index+"----risposta slave: " + s1 + "---"); //controllo la risposta dello slave if (s1.equals("sono pieno")){ KeySlave ks = new KeySlave(input.getAddress().getHostAddress(), input.getPort()); cs.removeSlave(ks); } else{ KeySlave ks2 = new KeySlave(input.getAddress().getHostAddress(), input.getPort()); cs.setAck(ks2); } } catch (InterruptedIOException iioe){ System.out.println(index+"-e' finito il tempo concesso per ricevere gli ack"); //è finito il tempo totale concesso x le risposte!!! break;//esco dal for e torno nel while } catch (IOException ioe){ continue; }// fine blocco catch }//fine for }//fine while //a tal punto chi mi ha risposto mi ha risposto!!, gli altri sono morti!!! numVivi = cs.removeDeath(); if (numVivi==0) return "errore"; }// fine blocco se non ero avviato da rielezione //rielezione ConstantsM.cm.setVivi(richiesta, numVivi);//dico alla cache e quindi agli slave che se muoi si devono aspettare tali slave System.out.println(index+"-attendo su " + dsVivi.getLocalPort()); //a questo punto ho fatto partire tutte le richieste, devo aspettare sulla porta contactSlaveVivi+index che tutti gli Slave mi rispondano PipedReader[] pipeInp = new PipedReader[numVivi]; PipedWriter[] pipeOut = new PipedWriter[numVivi]; boolean[] good = new boolean[numVivi];//indice se si è verificato un errore durante la lettura dalla pipe e non lo considero piu' for (int g=0; g<numVivi; g++) good[g]=true; buffer = new byte[65507]; input = new DatagramPacket(buffer, buffer.length); int numViviVivi=0;//sono quelli che effettivamente finiranno e mi vorranno mandare il file via TCP long prima=0; long dopo =0; long timeRemain = attesaLivelli;//il tempo di ateesa è proporzionale al numero di livelli da esplorare! try{dsVivi.setSoTimeout((int)timeRemain);}catch (Exception e){} scaricaFile[] sf = new scaricaFile[numVivi]; int h=0; for (int i=0; i<numVivi; i++){ try{ if (dopo>0) timeRemain = timeRemain - (dopo-prima); dopo=0; if (timeRemain<=0) break; dsVivi.setSoTimeout((int)timeRemain);//timeout senno' mi blocco x sempre!!! prima = System.currentTimeMillis(); System.out.println("attendo gli ack degli slave 'vivi' che hanno finito..."); input = new DatagramPacket(buffer, buffer.length); try{ dsVivi.receive(input);//se non risponde dopo x msec catch eccezione } catch (InterruptedIOException iioe){ System.out.println(index+"-e' finito il tempo concesso per ricevere gli ack"); break; } System.out.println(index+"-ricevuto ack da " + input.getAddress().getHostAddress()); dopo = System.currentTimeMillis(); //controllo se lo slave è ritenuto "vivo" dal sistema //String arg = input.getAddress().getHostAddress(); //COME DATO LO SLAVE MI RESTITUISCE LA PORTA SU CUI ASCOLTAVA LA RICHIESTA!!! KeySlave ks = new KeySlave(input.getAddress().getHostAddress(), Integer.parseInt(new String(input.getData(), 0, input.getLength()))); if (cs.isPresent(ks)){ pipeInp[h] = new PipedReader(); pipeOut[h] = new PipedWriter(); InetAddress destAck = input.getAddress(); data = "ack".getBytes(); DatagramPacket output = new DatagramPacket(data, data.length, destAck, input.getPort()); dsVivi.send(output); System.out.println(index+"-parte il Thread " + h + "per scaricare da: "+input.getAddress().getHostAddress()+"\nAttendo su Pipe: " + pipeInp[h]); int portScarica=0; //la porta TCP è la stessa UDP da cui ha spedito!! portScarica = input.getPort();//Integer.parseInt(new String(input.getData(), 0, input.getLength())); sf[h] = new scaricaFile(h, input, pipeInp[h], pipeOut[h], index, portScarica ); if (sf[h].okPipe){ sf[h].start(); numViviVivi++; h++; } } } catch (IOException e){ break; } }//fine for h--;//a questo punto i thread e le pipe vanno effettivamente da 0 a numViviVivi!!! //ho fatto partire tutti i thread che devono scaricare i file int numCompl=0;//numero dei Thread che hanno spedito correttamente il file System.out.println(index+"-attendo i thread scaricaFile che sono" + numViviVivi); while (numCompl<numViviVivi){ for (int i=0; i<numViviVivi; i++){ try{ if (good[i]){ if (pipeInp[i].ready()){//il thread i ha finito ed è 'buono' int conf = pipeInp[i].read(); System.out.println(index+"-il Thread ScaricaFile " + i + " ha finito"); pipeOut[i].write(conf); numCompl++; good[i]=false; } } } catch(Exception e){ System.out.println(index+"-inoltra query, sto aspettando che finiscano!!errore1:"+e); } } } //hanno finito tutti di scaricare!!! String nomeFile = eliminaRidondanze(numViviVivi); System.out.println(index+"-il file da mandare e' in : " + nomeFile); return nomeFile; }//fine metodo inoltraQuery private String eliminaRidondanze(int numFile){ //i nomi dei file in input sono "index-0.dat" fino a "index-numFile.dat" fondiFile ff = new fondiFile(numFile, index); return ff.esegui(); } }//fine class principale //class che scarica il file dallo slave class scaricaFile extends Thread{ private DatagramPacket msg; int num; PipedReader Inp; PipedWriter Out; public boolean okPipe=true; private int index; private int port; scaricaFile(int i,DatagramPacket input, PipedReader pipeInp, PipedWriter pipeOut, int arg_index, int arg_port ){ super(); index=arg_index; msg=input; num=i; if (arg_port==0) okPipe=false; else port=arg_port; try{ Inp=new PipedReader(pipeOut); Out=new PipedWriter(pipeInp); } catch(Exception e) { okPipe=false; } }//fine costruttore public void run(){ if (okPipe){ BufferedReader inp=null; PrintWriter out=null; Socket socket=null; PrintWriter dest=null; try{ //creo connessione tcp System.out.println(index+"-devo scaricare il file dallo Slave e mandarlo in file, sono il thread x "+msg.getAddress().getHostAddress() + "-"+msg.getPort()); Constants.prn(index+"-arrivata porta: "+ port); socket = new Socket(msg.getAddress().getHostAddress(), port); //debug try{socket.setSoTimeout(0);}catch (Exception e){} inp = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); out.println("ok"); out.flush(); dest = new PrintWriter(new FileWriter(index + "-"+ num + ".dat")); //debug socket.setSoTimeout(Constants.attesaRec); while (!inp.ready()){}//attesa attiva risposta server //while (inp.ready()){// acquisizione risposta server fino alla fine, comprensiva anche di più linee!! while (true){// acquisizione risposta server fino alla fine, comprensiva anche di più linee!! try { //System.out.println("vado in attesa"); String linea = inp.readLine(); if (linea.equals("..")){ System.out.println(index+"-FINE FILE"); break; } dest.println(linea); System.out.println(index+"-"+msg.getAddress().getHostAddress() + "---" + linea); } catch(Exception e){ System.out.println(index+"-errore2: "+ e); break; } } System.out.println(index+"-ho finito di scaricare il file"); dest.close();//chiudo il file out.println("ok"); out.flush(); Out.write(num);//dico a mio padre che ho finito int i; do{ i = Inp.read();//quando me lo dice posso morire }while (i!=num); //Out.close(); //Inp.close(); socket.close();//chiudo la socket TCP con lo slave } //fine try catch (UnknownHostException uhe){ System.out.println(index+"-errore3: "+ uhe); } catch (IOException ioe){ System.out.println(index+"-thread "+ num +" errore4: "+ ioe); }// fine blocco catch catch (Exception ioe){ System.out.println(index+"-errore5: "+ ioe); }// fine blocco catch } }//fine run }//fine classe scaricaFile //hashtable che contiene gli slave class contactSlave{ private KeySlave[] nomiSlave; private Hashtable ht; class campoSlave{ int nTrasm; boolean ackArr; campoSlave(int arg1, boolean arg2){ nTrasm=arg1; ackArr=arg2; } }//fine inner class campoSlave //costruttore contactSlave(){ //SlaveVivi sv = new SlaveVivi(); nomiSlave = ConstantsM.sv.getSlave(); ht =new Hashtable(nomiSlave.length); Constants.prn("-slave presenti nel sistema: "); for (int i=0;i<nomiSlave.length;i++){ //sv.getPort(nomiSlave[i]) if (nomiSlave[i] !=null){ //Constants.prn(nomiSlave[i]); KeySlave k = new KeySlave(nomiSlave[i].ipSlave, nomiSlave[i].portaSlave); campoSlave c = new campoSlave(0, false); ht.put(k, c); campoSlave c1 = (campoSlave)ht.get(k); System.out.println("-metto in hashtable: " + k.ipSlave + " "+ k.portaSlave+"campo: " + c1.ackArr + c1.nTrasm); } } Constants.prn("-fine lista slave"); }//fine costrutore tabella : chiave=string, valore = campoSlave public boolean isPossibleAck(){ //System.out.println("sono dentro a isPossibleAck"); for (Enumeration e = ht.keys() ; e.hasMoreElements() ;) { //System.out.println("" + e); Object d = e.nextElement(); KeySlave ks = (KeySlave)d; //System.out.println("elemento: "+d); campoSlave n = (campoSlave)ht.get((KeySlave)d); //System.out.println("elemento: "+ ks.ipSlave + " " + ks.portaSlave); if (n.ackArr==false && n.nTrasm < Constants.nTrasmMax) return true; } return false; } public boolean isPossibleAck(KeySlave arg){ campoSlave n = (campoSlave)ht.get(arg); if (n.ackArr==false && n.nTrasm < Constants.nTrasmMax){ return true; } return false; } public void incTrasm(KeySlave arg){ campoSlave n = (campoSlave)ht.get(arg); n.nTrasm++; //ht.put(arg,n); } public void setAck(KeySlave arg){ campoSlave n2 = (campoSlave)ht.get(arg); if (n2!=null){ n2.ackArr = true; //ht.put(arg, n2); } } public void removeSlave(KeySlave arg){ System.out.println("-IP da rimuovere" + arg); ht.remove(arg); } public Enumeration takeSlaves(){ return ht.keys(); } public int removeDeath(){ for (Enumeration e = ht.keys() ; e.hasMoreElements() ;) { Object d = e.nextElement(); campoSlave n = (campoSlave)ht.get((KeySlave)d); if (n.ackArr==false){ ht.remove((KeySlave)d); ConstantsM.sv.removeSlave((KeySlave)d); System.out.println("-sono contactslave, rimuovo: " + (KeySlave)d + " da slavevivi"); } } System.out.println("-il num di elementi e': " + ht.size()); return ht.size(); } public boolean isPresent(KeySlave arg){ for (Enumeration e = ht.keys() ; e.hasMoreElements() ;) { campoSlave n = (campoSlave)ht.get((KeySlave)e.nextElement()); if (n!=null) return true; } return false; } }//fine classe contactSlave