//tale thread ha il compito di inoltrare la richiesta ai master noti e di recuperarne i risultati appena diponibili. //e restituendo a tal punto il risultato al Thread SlaveExeMaster import java.io.*; import java.net.*; import java.util.*; //alla fine deve dare come risultato dei file da indexS-1.dat in poi!!!! non indexS-0.dat!!!! public class inoltraQueryMaster extends Thread{ private int index; private String stringFull; //private String richiesta; //private String mail; private int attesaLivelli; public inoltraQueryMaster(String[] ric, int arg_index){ super(); index = arg_index; Constants.prn(index+"-sono dentro a inoltraQueryMaster versione 1.0"); stringFull = ConstantsS.myIP + ":" + (ConstantsS.contactMasterVivi+index) + "\n" + ric[1] + "\n" + ric[2] + "\n" + ric[3]; //int valTtl=; attesaLivelli=Constants.attesa1liv * ((2*Integer.parseInt(ric[3]))+1); /*graficamente si vede (x il master): n.b. il ttl arriva già decrementato!! 0---->0 ramo da attendere 1---->2 rami da attendere 2---->4 rami da attendere 3---->6 rami da attendere e così via..... graficamente n.rami = ttl*2 */ } public void run(){ int rispQuery = this.inoltraQuery(); Constants.prn(index+"-rispQuery è: " + rispQuery); ConstantsS.numReperiti = rispQuery;//debug... deve mettere il num max del file a cui è arrivato!!! } public int inoltraQuery(){ contactMaster cs = new contactMaster(); byte[] data; byte[] buffer; DatagramSocket ds; DatagramPacket input; //manda le richieste e cerca di ottenere gli Ack massimo 3 volte try{ buffer = new byte[65507]; //System.out.println("cerco di creare socket dgram su porta: " + (Constants.contactSlaveUDP+index)); ds = new DatagramSocket(ConstantsS.contactMasterUDP+index); ds.setSoTimeout(Constants.attesaNRec);//timeout senno' mi blocco x sempre!!! //ds.setSoTimeout(0);//timeout senno' mi blocco x sempre!!! data = stringFull.getBytes(); } catch (Exception e){ System.out.println(index+"-errore creazione socket dgram per contattare i Master: " + e); return 0; } //creo già la socket x quelli che saranno "vivi" in modo che se rispondono oprima che io abbia finito il ciclo dei 3 ack non perdo i messaggi DatagramSocket dsVivi=null; try{ dsVivi = new DatagramSocket(ConstantsS.contactMasterVivi+index); //debug //dsVivi.setSoTimeout(Constants.attesaNRec);//timeout senno' mi blocco x sempre!!! dsVivi.setSoTimeout(attesaLivelli); } catch (Exception e){ System.out.println(index+"-errore creazione socket dgram per master 'vivi': " + e); return 0; } while (cs.isPossibleAck()){ input = new DatagramPacket(buffer, buffer.length); int numSend=0; Enumeration e; for (e = cs.takeMasters() ; e.hasMoreElements() ;) { String arg = (String)e.nextElement(); if (cs.isPossibleAck(arg)){ try{ InetAddress dest = InetAddress.getByName(arg); //debug: porta master??? int porta = ConstantsS.masterPortQuery+2; System.out.println("i master sono sulla porta: "+ConstantsS.masterPortQuery); DatagramPacket output = new DatagramPacket(data, data.length, dest, porta); System.out.println(index+"-sendo su: " + arg + ", porta " + porta); ds.send(output); } catch (UnknownHostException uhe){ cs.removeMaster(arg); continue; } catch (IOException ioe){ //c'è stato un errore di io indipendente dalla rete o dallo slave, è un problema mio //riprovero' dopo, al prossimo ciclo while continue; } cs.incTrasm(arg); numSend++; } }//fine for // a questo punto ho mandato le richiesta agli slave vivi, devo prendere gli ack!! if (numSend==0) return 0; long prima=0; long dopo =0; long timeRemain = Constants.attesaNRec; //attendo il numero di richieste che ho mandato!!! for (int i=0;i<numSend;i++){ try{ if (dopo>0) timeRemain = timeRemain - (dopo-prima); dopo=0; if (timeRemain<=0) break; ds.setSoTimeout((int)timeRemain);//timeout senno' mi blocco x sempre!!! prima = System.currentTimeMillis(); System.out.println(index+"-receive ack"); ds.receive(input);//se non risponde dopo x msec catch eccezione dopo = System.currentTimeMillis(); String s1 = new String(input.getData(), 0, input.getLength()); System.out.println(index+"----risposta master: " + s1 + "---"); //controllo la risposta dello slave if (s1.equals("sono pieno")){ String ks = new String(input.getAddress().getHostAddress()); cs.removeMaster(ks); } else{ String ks2 = new String(input.getAddress().getHostAddress()); //System.out.println("setto l'ack x lo slave: "+input.getAddress().getHostAddress()+", porta: " + input.getPort()); cs.setAck(ks2); } } catch (InterruptedIOException iioe){ System.out.println(index+"-e' finito il tempo concesso per ricevere gli ack"); //è finito il tempo totale concesso x le risposte!!! break;//esco dal for e torno nel while } catch (IOException ioe){ continue; }// fine blocco catch }//fine for }//fine while //a tal punto chi mi ha risposto mi ha risposto!!, gli altri sono morti!!! int numVivi = cs.removeDeath(); if (numVivi==0) return 0; //da qui in poi vado a vedere sulla porta UDP per i "vivi" //a questo punto ho fatto partire tutte le richieste, devo aspettare indefinitamente(?) sulla porta contactSlaveVivi+index che tutti gli Slave mi rispondano PipedReader[] pipeInp = new PipedReader[numVivi+1];//debug, devo partire da 1(per i file) quindi metto +1 e l'indice 0 non lo considero!!! PipedWriter[] pipeOut = new PipedWriter[numVivi+1]; boolean[] good = new boolean[numVivi+1];//indice se si è verificato un errore durante la lettura dalla pipe e non lo considero piu' for (int g=0; g<(numVivi+1); g++) good[g]=true; buffer = new byte[65507]; input = new DatagramPacket(buffer, buffer.length); int numViviVivi=0;//sono quelli che effettivamente finiranno e mi vorranno mandare il file via TCP!!! long prima=0; long dopo =0; long timeRemain = attesaLivelli;//il lavoro degli slave partiti è moolto lungo!!! try{dsVivi.setSoTimeout((int)timeRemain);}catch (Exception e){} scaricaFile[] sf = new scaricaFile[numVivi+1]; int h=1; System.out.println("attendo gli ack degli slave 'vivi' che hanno finito sulla porta: " + dsVivi.getLocalPort()); for (int i=0; i<numVivi; i++){ try{ if (dopo>0) timeRemain = timeRemain - (dopo-prima); dopo=0; if (timeRemain<=0) break; dsVivi.setSoTimeout((int)timeRemain);//timeout senno' mi blocco x sempre!!! prima = System.currentTimeMillis(); input = new DatagramPacket(buffer, buffer.length); try{ dsVivi.receive(input);//se non risponde dopo x msec catch eccezione } catch (InterruptedIOException iioe){ System.out.println(index+"-e' finito il tempo concesso per ricevere gli ack"); break; } System.out.println(index+"-ricevuto 2° ack che ha finito da " + input.getAddress().getHostAddress()); dopo = System.currentTimeMillis(); //controllo se lo slave è ritenuto "vivo" dal sistema //String arg = input.getAddress().getHostAddress(); String ks = new String(input.getAddress().getHostAddress()); String ks1=new String(input.getData(), 0, input.getLength()); if (cs.isPresent(ks) && !ks1.equals("errore")){ System.out.println(index+"-il master era presente tra i vivi, lo considero"); pipeInp[h] = new PipedReader(); pipeOut[h] = new PipedWriter(); System.out.println(index+"-ho impostato le pipe"); InetAddress destAck = input.getAddress(); //la porta TCP è la stessa UDP da cui ha spedito!! System.out.println(index+"-la porta da cui scaricare è: " + ks1); //String s2 = new String(input.getData(), 0, input.getLength()); int portScarica = Integer.parseInt(ks1); data = "ack".getBytes(); DatagramPacket output = new DatagramPacket(data, data.length, destAck, input.getPort()); dsVivi.send(output); sf[h] = new scaricaFile(h, input, pipeInp[h], pipeOut[h], index, portScarica ); if (sf[h].okPipe){ sf[h].start(); System.out.println(index+"-parte il Thread " + h + "per scaricare da: "+input.getAddress().getHostAddress()+"\nAttendo su Pipe: " + pipeInp[h]); numViviVivi++; h++; ConstantsS.slaveLoad++; } } } catch (IOException e){ break; } }//fine for h--;//i thread che scaricano sono da 1 ad h compresi!!! //ho fatto partire tutti i thread che devono scaricare i file int numCompl=0;//numero dei Thread che hanno spedito correttamente il file System.out.println(index+"-attendo i thread scaricaFile che sono" + h); while (numCompl<h){ //System.out.println("attendo i thread scaricafile"); for (int i=1; i<h+1; i++){ //System.out.println("attendo i thread scaricafile da 1 a " + h); try{ if (good[i]){ //System.out.println("controllo la pipe: "+pipeInp[i]); if (pipeInp[i].ready()){//il thread i ha finito ed è 'buono' int conf = pipeInp[i].read(); System.out.println(index+"-il thread ScaricaFile " + i + " ha finito"); pipeOut[i].write(conf); //pipeInp[i].close();//??????????? //pipeOut[i].close(); numCompl++; good[i]=false; ConstantsS.slaveLoad--; } } } catch(Exception e){ System.out.println(index+"-inoltra query, sto aspettando che finiscano!!errore1:"+e); //good[i]=false; //numCompl++; //continue; } } } //hanno finito tutti di scaricare!!! //String nomeFile = eliminaRidondanze(numViviVivi); //System.out.println("il file da mandare e' in : " + nomeFile); return numCompl; }//fine metodo inoltraQuery //} }//fine class principale //class che scarica il file dallo slave class scaricaFile extends Thread{ private DatagramPacket msg; int num; PipedReader Inp; PipedWriter Out; public boolean okPipe=true; private int index; private int port; scaricaFile(int i,DatagramPacket input, PipedReader pipeInp, PipedWriter pipeOut, int arg_index, int arg_port ){ super(); index=arg_index; msg=input; num=i; if (arg_port==0) okPipe=false; else port=arg_port; try{ Inp=new PipedReader(pipeOut); Out=new PipedWriter(pipeInp); } catch(Exception e) { okPipe=false; } }//fine costruttore public void run(){ if (okPipe){ BufferedReader inp=null; PrintWriter out=null; //Socket socket=null; PrintWriter dest=null; try{ //creo connessione tcp System.out.println(index+"-"+num+"devo scaricare il file dallo Slave e mandarlo in file, sono il thread x "+msg.getAddress().getHostAddress() + "-"+msg.getPort()); Constants.prn(index+"-"+num+"arrivata porta: "+ port); Socket socket = new Socket(msg.getAddress().getHostAddress(), port); //debug Constants.prn(index+"-"+num+"connessione riuscita"); //try{socket.setSoTimeout(0);}catch (Exception e){} inp = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); out.println("ok"); out.flush(); dest = new PrintWriter(new FileWriter(index + "-"+ num + ".dat")); //debug socket.setSoTimeout(Constants.attesaRec); while (!inp.ready()){}//attesa attiva risposta server //while (inp.ready()){// acquisizione risposta server fino alla fine, comprensiva anche di più linee!! Constants.prn(index+"-"+num+"comincio a leggere il file"); while (true){// acquisizione risposta server fino alla fine, comprensiva anche di più linee!! try { //System.out.println("vado in attesa di una linea"); String linea = inp.readLine(); if (linea==null){ System.out.println(index+"-"+num+"FINE FILE"); break; } dest.println(linea); System.out.println(index+"-"+num+"-"+msg.getAddress().getHostAddress() + "---" + linea); } catch(Exception e){ System.out.println(index+"-"+num+"errore2: "+ e); break; } } dest.close();//chiudo il file //out.println("ok"); //out.flush(); System.out.println(index+"-"+num+"dico a mio padre che ho finito e attendo"); Out.write(num);//dico a mio padre che ho finito int i; //do{ i = Inp.read();//quando me lo dice posso morire //}while (i!=num); System.out.println(index+"-"+num+"mi ha detto che posso morire, muoio"); //Out.close(); //Inp.close(); socket.close();//chiudo la socket TCP con lo slave } //fine try catch (UnknownHostException uhe){ System.out.println(index+"-"+num+"errore3: "+ uhe); } catch (IOException ioe){ System.out.println(index+"-"+num+"thread "+ num +" errore4: "+ ioe); }// fine blocco catch catch (Exception ioe){ System.out.println(index+"-"+num+"errore5: "+ ioe); }// fine blocco catch } }//fine run }//fine classe scaricaFile //hashtable che contiene gli Master class contactMaster{ private String[] nomiMaster; private Hashtable ht; class campoMaster{ int nTrasm; boolean ackArr; campoMaster(int arg1, boolean arg2){ nTrasm=arg1; ackArr=arg2; } }//fine inner class campoMaster //costruttore contactMaster(){ //debug : lista da file nomiMaster = this.getMasterFile(); ht =new Hashtable(nomiMaster.length); Constants.prn("Master presenti nel sistema: "); for (int i=0;i<nomiMaster.length;i++){ if (nomiMaster[i] !=null){ campoMaster c = new campoMaster(0, false); ht.put(nomiMaster[i], c); campoMaster c1 = (campoMaster)ht.get(nomiMaster[i]); System.out.println("metto in hashtable: " + nomiMaster[i] + ",campo: " + c1.ackArr + "-" + c1.nTrasm); } } Constants.prn("fine lista master"); }//fine costrutore tabella : chiave=string, valore = campoMaster public boolean isPossibleAck(){ //System.out.println("sono dentro a isPossibleAck"); for (Enumeration e = ht.keys() ; e.hasMoreElements() ;) { Object d = e.nextElement(); String ks = (String)d; campoMaster n = (campoMaster)ht.get(ks); if (n.ackArr==false && n.nTrasm < Constants.nTrasmMax) return true; } return false; } public boolean isPossibleAck(String arg){ campoMaster n = (campoMaster)ht.get(arg); if (n.ackArr==false && n.nTrasm < Constants.nTrasmMax){ return true; } return false; } public void incTrasm(String arg){ campoMaster n = (campoMaster)ht.get(arg); n.nTrasm++; //ht.put(arg,n); } public void setAck(String arg){ campoMaster n2 = (campoMaster)ht.get(arg); if (n2!=null){ n2.ackArr = true; //ht.put(arg, n2); } } public void removeMaster(String arg){ System.out.println("IP da rimuovere" + arg); ht.remove(arg); } public Enumeration takeMasters(){ return ht.keys(); } public int removeDeath(){ //debug for (Enumeration e = ht.keys() ; e.hasMoreElements() ;) { Object d = e.nextElement(); campoMaster n = (campoMaster)ht.get((String)d); if (n.ackArr==false){ ht.remove((String)d); System.out.println("rimuovo: " + (String)d); } } System.out.println("il num di elementi e': " + ht.size()); return ht.size(); } public boolean isPresent(String arg){ for (Enumeration e = ht.keys() ; e.hasMoreElements() ;) { campoMaster n = (campoMaster)ht.get((String)e.nextElement()); if (n!=null) return true; } return false; } private String[] getMasterFile(){ Vector temp=new Vector(); String[] temp2; try { FileReader fis = new FileReader("listaMaster.txt"); BufferedReader inp = new BufferedReader(fis); int i=0; String line; while ((line = inp.readLine())!=null){ temp.addElement(line); i++; } System.out.println("fine input"); fis.close(); temp2=new String[i]; temp2 = (String[])temp.toArray(temp2); return temp2; } catch ( Exception e){ return null; } } }//fine classe contactMaster