//tale thread ha il compito di inoltrare la richiesta ai master noti e di recuperarne i risultati appena diponibili.
//e restituendo a tal punto il risultato al Thread SlaveExeMaster
import java.io.*;
import java.net.*;
import java.util.*;
//alla fine deve dare come risultato dei file da indexS-1.dat in poi!!!! non indexS-0.dat!!!!
public class inoltraQueryMaster extends Thread{
private int index;
private String stringFull;
//private String richiesta;
//private String mail;
private int attesaLivelli;
public inoltraQueryMaster(String[] ric, int arg_index){
super();
index = arg_index;
Constants.prn(index+"-sono dentro a inoltraQueryMaster versione 1.0");
stringFull = ConstantsS.myIP + ":" + (ConstantsS.contactMasterVivi+index) + "\n" + ric[1] + "\n" + ric[2] + "\n" + ric[3];
//int valTtl=;
attesaLivelli=Constants.attesa1liv * ((2*Integer.parseInt(ric[3]))+1);
/*graficamente si vede (x il master):
n.b. il ttl arriva già decrementato!!
0---->0 ramo da attendere
1---->2 rami da attendere
2---->4 rami da attendere
3---->6 rami da attendere
e così via.....
graficamente n.rami = ttl*2
*/
}
public void run(){
int rispQuery = this.inoltraQuery();
Constants.prn(index+"-rispQuery è: " + rispQuery);
ConstantsS.numReperiti = rispQuery;//debug... deve mettere il num max del file a cui è arrivato!!!
}
public int inoltraQuery(){
contactMaster cs = new contactMaster();
byte[] data;
byte[] buffer;
DatagramSocket ds;
DatagramPacket input;
//manda le richieste e cerca di ottenere gli Ack massimo 3 volte
try{
buffer = new byte[65507];
//System.out.println("cerco di creare socket dgram su porta: " + (Constants.contactSlaveUDP+index));
ds = new DatagramSocket(ConstantsS.contactMasterUDP+index);
ds.setSoTimeout(Constants.attesaNRec);//timeout senno' mi blocco x sempre!!!
//ds.setSoTimeout(0);//timeout senno' mi blocco x sempre!!!
data = stringFull.getBytes();
}
catch (Exception e){
System.out.println(index+"-errore creazione socket dgram per contattare i Master: " + e);
return 0;
}
//creo già la socket x quelli che saranno "vivi" in modo che se rispondono oprima che io abbia finito il ciclo dei 3 ack non perdo i messaggi
DatagramSocket dsVivi=null;
try{
dsVivi = new DatagramSocket(ConstantsS.contactMasterVivi+index);
//debug
//dsVivi.setSoTimeout(Constants.attesaNRec);//timeout senno' mi blocco x sempre!!!
dsVivi.setSoTimeout(attesaLivelli);
}
catch (Exception e){
System.out.println(index+"-errore creazione socket dgram per master 'vivi': " + e);
return 0;
}
while (cs.isPossibleAck()){
input = new DatagramPacket(buffer, buffer.length);
int numSend=0;
Enumeration e;
for (e = cs.takeMasters() ; e.hasMoreElements() ;) {
String arg = (String)e.nextElement();
if (cs.isPossibleAck(arg)){
try{
InetAddress dest = InetAddress.getByName(arg);
//debug: porta master???
int porta = ConstantsS.masterPortQuery+2;
System.out.println("i master sono sulla porta: "+ConstantsS.masterPortQuery);
DatagramPacket output = new DatagramPacket(data, data.length, dest, porta);
System.out.println(index+"-sendo su: " + arg + ", porta " + porta);
ds.send(output);
}
catch (UnknownHostException uhe){
cs.removeMaster(arg);
continue;
}
catch (IOException ioe){
//c'è stato un errore di io indipendente dalla rete o dallo slave, è un problema mio
//riprovero' dopo, al prossimo ciclo while
continue;
}
cs.incTrasm(arg);
numSend++;
}
}//fine for
// a questo punto ho mandato le richiesta agli slave vivi, devo prendere gli ack!!
if (numSend==0)
return 0;
long prima=0;
long dopo =0;
long timeRemain = Constants.attesaNRec;
//attendo il numero di richieste che ho mandato!!!
for (int i=0;i<numSend;i++){
try{
if (dopo>0)
timeRemain = timeRemain - (dopo-prima);
dopo=0;
if (timeRemain<=0)
break;
ds.setSoTimeout((int)timeRemain);//timeout senno' mi blocco x sempre!!!
prima = System.currentTimeMillis();
System.out.println(index+"-receive ack");
ds.receive(input);//se non risponde dopo x msec catch eccezione
dopo = System.currentTimeMillis();
String s1 = new String(input.getData(), 0, input.getLength());
System.out.println(index+"----risposta master: " + s1 + "---");
//controllo la risposta dello slave
if (s1.equals("sono pieno")){
String ks = new String(input.getAddress().getHostAddress());
cs.removeMaster(ks);
}
else{
String ks2 = new String(input.getAddress().getHostAddress());
//System.out.println("setto l'ack x lo slave: "+input.getAddress().getHostAddress()+", porta: " + input.getPort());
cs.setAck(ks2);
}
}
catch (InterruptedIOException iioe){
System.out.println(index+"-e' finito il tempo concesso per ricevere gli ack");
//è finito il tempo totale concesso x le risposte!!!
break;//esco dal for e torno nel while
}
catch (IOException ioe){
continue;
}// fine blocco catch
}//fine for
}//fine while
//a tal punto chi mi ha risposto mi ha risposto!!, gli altri sono morti!!!
int numVivi = cs.removeDeath();
if (numVivi==0)
return 0;
//da qui in poi vado a vedere sulla porta UDP per i "vivi"
//a questo punto ho fatto partire tutte le richieste, devo aspettare indefinitamente(?) sulla porta contactSlaveVivi+index che tutti gli Slave mi rispondano
PipedReader[] pipeInp = new PipedReader[numVivi+1];//debug, devo partire da 1(per i file) quindi metto +1 e l'indice 0 non lo considero!!!
PipedWriter[] pipeOut = new PipedWriter[numVivi+1];
boolean[] good = new boolean[numVivi+1];//indice se si è verificato un errore durante la lettura dalla pipe e non lo considero piu'
for (int g=0; g<(numVivi+1); g++)
good[g]=true;
buffer = new byte[65507];
input = new DatagramPacket(buffer, buffer.length);
int numViviVivi=0;//sono quelli che effettivamente finiranno e mi vorranno mandare il file via TCP!!!
long prima=0;
long dopo =0;
long timeRemain = attesaLivelli;//il lavoro degli slave partiti è moolto lungo!!!
try{dsVivi.setSoTimeout((int)timeRemain);}catch (Exception e){}
scaricaFile[] sf = new scaricaFile[numVivi+1];
int h=1;
System.out.println("attendo gli ack degli slave 'vivi' che hanno finito sulla porta: " + dsVivi.getLocalPort());
for (int i=0; i<numVivi; i++){
try{
if (dopo>0)
timeRemain = timeRemain - (dopo-prima);
dopo=0;
if (timeRemain<=0)
break;
dsVivi.setSoTimeout((int)timeRemain);//timeout senno' mi blocco x sempre!!!
prima = System.currentTimeMillis();
input = new DatagramPacket(buffer, buffer.length);
try{
dsVivi.receive(input);//se non risponde dopo x msec catch eccezione
}
catch (InterruptedIOException iioe){
System.out.println(index+"-e' finito il tempo concesso per ricevere gli ack");
break;
}
System.out.println(index+"-ricevuto 2° ack che ha finito da " + input.getAddress().getHostAddress());
dopo = System.currentTimeMillis();
//controllo se lo slave è ritenuto "vivo" dal sistema
//String arg = input.getAddress().getHostAddress();
String ks = new String(input.getAddress().getHostAddress());
String ks1=new String(input.getData(), 0, input.getLength());
if (cs.isPresent(ks) && !ks1.equals("errore")){
System.out.println(index+"-il master era presente tra i vivi, lo considero");
pipeInp[h] = new PipedReader();
pipeOut[h] = new PipedWriter();
System.out.println(index+"-ho impostato le pipe");
InetAddress destAck = input.getAddress();
//la porta TCP è la stessa UDP da cui ha spedito!!
System.out.println(index+"-la porta da cui scaricare è: " + ks1);
//String s2 = new String(input.getData(), 0, input.getLength());
int portScarica = Integer.parseInt(ks1);
data = "ack".getBytes();
DatagramPacket output = new DatagramPacket(data, data.length, destAck, input.getPort());
dsVivi.send(output);
sf[h] = new scaricaFile(h, input, pipeInp[h], pipeOut[h], index, portScarica );
if (sf[h].okPipe){
sf[h].start();
System.out.println(index+"-parte il Thread " + h + "per scaricare da: "+input.getAddress().getHostAddress()+"\nAttendo su Pipe: " + pipeInp[h]);
numViviVivi++;
h++;
ConstantsS.slaveLoad++;
}
}
}
catch (IOException e){
break;
}
}//fine for
h--;//i thread che scaricano sono da 1 ad h compresi!!!
//ho fatto partire tutti i thread che devono scaricare i file
int numCompl=0;//numero dei Thread che hanno spedito correttamente il file
System.out.println(index+"-attendo i thread scaricaFile che sono" + h);
while (numCompl<h){
//System.out.println("attendo i thread scaricafile");
for (int i=1; i<h+1; i++){
//System.out.println("attendo i thread scaricafile da 1 a " + h);
try{
if (good[i]){
//System.out.println("controllo la pipe: "+pipeInp[i]);
if (pipeInp[i].ready()){//il thread i ha finito ed è 'buono'
int conf = pipeInp[i].read();
System.out.println(index+"-il thread ScaricaFile " + i + " ha finito");
pipeOut[i].write(conf);
//pipeInp[i].close();//???????????
//pipeOut[i].close();
numCompl++;
good[i]=false;
ConstantsS.slaveLoad--;
}
}
}
catch(Exception e){
System.out.println(index+"-inoltra query, sto aspettando che finiscano!!errore1:"+e);
//good[i]=false;
//numCompl++;
//continue;
}
}
}
//hanno finito tutti di scaricare!!!
//String nomeFile = eliminaRidondanze(numViviVivi);
//System.out.println("il file da mandare e' in : " + nomeFile);
return numCompl;
}//fine metodo inoltraQuery
//}
}//fine class principale
//class che scarica il file dallo slave
class scaricaFile extends Thread{
private DatagramPacket msg;
int num;
PipedReader Inp;
PipedWriter Out;
public boolean okPipe=true;
private int index;
private int port;
scaricaFile(int i,DatagramPacket input, PipedReader pipeInp, PipedWriter pipeOut, int arg_index, int arg_port ){
super();
index=arg_index;
msg=input;
num=i;
if (arg_port==0)
okPipe=false;
else
port=arg_port;
try{
Inp=new PipedReader(pipeOut);
Out=new PipedWriter(pipeInp);
}
catch(Exception e) {
okPipe=false;
}
}//fine costruttore
public void run(){
if (okPipe){
BufferedReader inp=null;
PrintWriter out=null;
//Socket socket=null;
PrintWriter dest=null;
try{
//creo connessione tcp
System.out.println(index+"-"+num+"devo scaricare il file dallo Slave e mandarlo in file, sono il thread x "+msg.getAddress().getHostAddress() + "-"+msg.getPort());
Constants.prn(index+"-"+num+"arrivata porta: "+ port);
Socket socket = new Socket(msg.getAddress().getHostAddress(), port);
//debug
Constants.prn(index+"-"+num+"connessione riuscita");
//try{socket.setSoTimeout(0);}catch (Exception e){}
inp = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
out.println("ok");
out.flush();
dest = new PrintWriter(new FileWriter(index + "-"+ num + ".dat"));
//debug
socket.setSoTimeout(Constants.attesaRec);
while (!inp.ready()){}//attesa attiva risposta server
//while (inp.ready()){// acquisizione risposta server fino alla fine, comprensiva anche di più linee!!
Constants.prn(index+"-"+num+"comincio a leggere il file");
while (true){// acquisizione risposta server fino alla fine, comprensiva anche di più linee!!
try {
//System.out.println("vado in attesa di una linea");
String linea = inp.readLine();
if (linea==null){
System.out.println(index+"-"+num+"FINE FILE");
break;
}
dest.println(linea);
System.out.println(index+"-"+num+"-"+msg.getAddress().getHostAddress() + "---" + linea);
}
catch(Exception e){
System.out.println(index+"-"+num+"errore2: "+ e);
break;
}
}
dest.close();//chiudo il file
//out.println("ok");
//out.flush();
System.out.println(index+"-"+num+"dico a mio padre che ho finito e attendo");
Out.write(num);//dico a mio padre che ho finito
int i;
//do{
i = Inp.read();//quando me lo dice posso morire
//}while (i!=num);
System.out.println(index+"-"+num+"mi ha detto che posso morire, muoio");
//Out.close();
//Inp.close();
socket.close();//chiudo la socket TCP con lo slave
} //fine try
catch (UnknownHostException uhe){
System.out.println(index+"-"+num+"errore3: "+ uhe);
}
catch (IOException ioe){
System.out.println(index+"-"+num+"thread "+ num +" errore4: "+ ioe);
}// fine blocco catch
catch (Exception ioe){
System.out.println(index+"-"+num+"errore5: "+ ioe);
}// fine blocco catch
}
}//fine run
}//fine classe scaricaFile
//hashtable che contiene gli Master
class contactMaster{
private String[] nomiMaster;
private Hashtable ht;
class campoMaster{
int nTrasm;
boolean ackArr;
campoMaster(int arg1, boolean arg2){
nTrasm=arg1;
ackArr=arg2;
}
}//fine inner class campoMaster
//costruttore
contactMaster(){
//debug : lista da file
nomiMaster = this.getMasterFile();
ht =new Hashtable(nomiMaster.length);
Constants.prn("Master presenti nel sistema: ");
for (int i=0;i<nomiMaster.length;i++){
if (nomiMaster[i] !=null){
campoMaster c = new campoMaster(0, false);
ht.put(nomiMaster[i], c);
campoMaster c1 = (campoMaster)ht.get(nomiMaster[i]);
System.out.println("metto in hashtable: " + nomiMaster[i] + ",campo: " + c1.ackArr + "-" + c1.nTrasm);
}
}
Constants.prn("fine lista master");
}//fine costrutore tabella : chiave=string, valore = campoMaster
public boolean isPossibleAck(){
//System.out.println("sono dentro a isPossibleAck");
for (Enumeration e = ht.keys() ; e.hasMoreElements() ;) {
Object d = e.nextElement();
String ks = (String)d;
campoMaster n = (campoMaster)ht.get(ks);
if (n.ackArr==false && n.nTrasm < Constants.nTrasmMax)
return true;
}
return false;
}
public boolean isPossibleAck(String arg){
campoMaster n = (campoMaster)ht.get(arg);
if (n.ackArr==false && n.nTrasm < Constants.nTrasmMax){
return true;
}
return false;
}
public void incTrasm(String arg){
campoMaster n = (campoMaster)ht.get(arg);
n.nTrasm++;
//ht.put(arg,n);
}
public void setAck(String arg){
campoMaster n2 = (campoMaster)ht.get(arg);
if (n2!=null){
n2.ackArr = true;
//ht.put(arg, n2);
}
}
public void removeMaster(String arg){
System.out.println("IP da rimuovere" + arg);
ht.remove(arg);
}
public Enumeration takeMasters(){
return ht.keys();
}
public int removeDeath(){
//debug
for (Enumeration e = ht.keys() ; e.hasMoreElements() ;) {
Object d = e.nextElement();
campoMaster n = (campoMaster)ht.get((String)d);
if (n.ackArr==false){
ht.remove((String)d);
System.out.println("rimuovo: " + (String)d);
}
}
System.out.println("il num di elementi e': " + ht.size());
return ht.size();
}
public boolean isPresent(String arg){
for (Enumeration e = ht.keys() ; e.hasMoreElements() ;) {
campoMaster n = (campoMaster)ht.get((String)e.nextElement());
if (n!=null)
return true;
}
return false;
}
private String[] getMasterFile(){
Vector temp=new Vector();
String[] temp2;
try {
FileReader fis = new FileReader("listaMaster.txt");
BufferedReader inp = new BufferedReader(fis);
int i=0;
String line;
while ((line = inp.readLine())!=null){
temp.addElement(line);
i++;
}
System.out.println("fine input");
fis.close();
temp2=new String[i];
temp2 = (String[])temp.toArray(temp2);
return temp2;
}
catch ( Exception e){
return null;
}
}
}//fine classe contactMaster