cvs server: Diffing . Index: btlaunchmany.py =================================================================== RCS file: /opt/src/cvsroot/bittorrent/btlaunchmany.py,v retrieving revision 1.1.1.3 retrieving revision 1.1.1.3.4.1 diff -c -r1.1.1.3 -r1.1.1.3.4.1 *** btlaunchmany.py 2003/09/30 20:02:44 1.1.1.3 --- btlaunchmany.py 2004/04/15 17:45:35 1.1.1.3.4.1 *************** *** 7,12 **** --- 7,17 ---- # fmttime and fmtsize stolen from btdownloadcurses. # see LICENSE.txt for license information + + from BitTorrent import RawServer + RawServer.listen_thread=RawServer._listen_thread() + + from BitTorrent.download import download from threading import Thread, Event, Lock from os import listdir Index: btlaunchmanycurses.py =================================================================== RCS file: /opt/src/cvsroot/bittorrent/btlaunchmanycurses.py,v retrieving revision 1.1.1.2 retrieving revision 1.1.1.2.4.1 diff -c -r1.1.1.2 -r1.1.1.2.4.1 *** btlaunchmanycurses.py 2003/09/30 20:02:44 1.1.1.2 --- btlaunchmanycurses.py 2004/04/15 11:41:33 1.1.1.2.4.1 *************** *** 7,12 **** --- 7,17 ---- # fmttime and fmtsize stolen from btdownloadcurses. # see LICENSE.txt for license information + + from BitTorrent import RawServer + RawServer.listen_thread=RawServer._listen_thread() + + from BitTorrent.download import download from threading import Thread, Event, Lock from os import listdir cvs server: Diffing BitTorrent Index: BitTorrent/RawServer.py =================================================================== RCS file: /opt/src/cvsroot/bittorrent/BitTorrent/RawServer.py,v retrieving revision 1.1.1.9 retrieving revision 1.1.1.9.2.9 diff -c -r1.1.1.9 -r1.1.1.9.2.9 *** BitTorrent/RawServer.py 2004/04/13 02:22:07 1.1.1.9 --- BitTorrent/RawServer.py 2004/06/15 17:31:32 1.1.1.9.2.9 *************** *** 19,24 **** --- 19,26 ---- all = POLLIN | POLLOUT + listen_thread=None + class SingleSocket: def __init__(self, raw_server, sock, handler): self.raw_server = raw_server *************** *** 85,90 **** --- 87,93 ---- self.timeout = timeout self.poll = poll() # {socket: SingleSocket} + self.server=None self.single_sockets = {} self.dead_from_write = [] self.doneflag = doneflag *************** *** 109,128 **** if k.socket is not None: self._close_socket(k) ! def bind(self, port, bind = '', reuse = False): self.bindaddr = bind ! server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ! if reuse: ! server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ! server.setblocking(0) ! try: ! server.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, 32) ! except: ! pass ! server.bind((bind, port)) ! server.listen(5) ! self.poll.register(server, POLLIN) ! self.server = server def start_connection(self, dns, handler = None): if handler is None: --- 112,146 ---- if k.socket is not None: self._close_socket(k) ! def bind(self, port, bind = '', reuse = False, download_id = None): self.bindaddr = bind ! if not listen_thread : ! server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ! if reuse: ! server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ! server.setblocking(0) ! try: ! server.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, 32) ! except: ! pass ! server.bind((bind, port)) ! server.listen(5) ! self.poll.register(server, POLLIN) ! self.server = server ! else : ! assert download_id ! import os ! import Queue ! r,w=os.pipe() ! self.socket_notificacion=r ! self.poll.register(r, POLLIN) ! self.cola_notificacion=Queue.Queue() ! listen_thread.cola_notif.put((bind,port,download_id,self.cola_notificacion,w)) ! os.write(listen_thread.socket_notificacion,"A") ! os.read(r,1) ! e=self.cola_notificacion.get() ! if e : ! raise e def start_connection(self, dns, handler = None): if handler is None: *************** *** 143,149 **** def handle_events(self, events): for sock, event in events: ! if sock == self.server.fileno(): if event & (POLLHUP | POLLERR) != 0: self.poll.unregister(self.server) self.server.close() --- 161,186 ---- def handle_events(self, events): for sock, event in events: ! if listen_thread and (sock==self.socket_notificacion) : ! import os ! os.read(sock,1) ! newsock, data = self.cola_notificacion.get() ! try : ! newsock.setblocking(0) ! if len(self.single_sockets) >= self.maxconnects: ! newsock.close() ! continue ! nss = SingleSocket(self, newsock, self.handler) ! self.single_sockets[newsock.fileno()] = nss ! self.poll.register(newsock, POLLIN) ! self.handler.external_connection_made(nss) ! nss.connected = True ! nss.last_hit = time() ! nss.handler.data_came_in(nss, data) ! except socket.error: ! print_exc() ! sleep(1) ! elif self.server and (sock == self.server.fileno()): if event & (POLLHUP | POLLERR) != 0: self.poll.unregister(self.server) self.server.close() *************** *** 216,222 **** try: func() except KeyboardInterrupt: ! print_exc() return except: if self.noisy: --- 253,259 ---- try: func() except KeyboardInterrupt: ! irint_exc() return except: if self.noisy: *************** *** 253,259 **** finally: for ss in self.single_sockets.values(): ss.close() ! self.server.close() def _close_dead(self): while len(self.dead_from_write) > 0: --- 290,297 ---- finally: for ss in self.single_sockets.values(): ss.close() ! if self.server : ! self.server.close() def _close_dead(self): while len(self.dead_from_write) > 0: *************** *** 270,275 **** --- 308,402 ---- del self.single_sockets[sock] s.socket = None s.handler.connection_lost(s) + + + class _listen_thread : + def __init__(self) : + import select + import Queue + import threading + import os + + self._suscripciones_socket={} + self._suscripciones_download_id={} + self._sockets={} + self.poll=select.poll() + self.cola_notif=Queue.Queue() + r,w=os.pipe() + self.socket_notificacion=w + self.__socket_notificacion=r + self.poll.register(r, POLLIN) + t=Thread(target = self.listen_forever, name="Thread que procesa las nuevas conexiones") + t.setDaemon(True) + t.start() + + def listen_forever(self) : + import time + import os + + while True : + p=self.poll.poll(3600) + for i in p : + s=i[0] + if s==self.__socket_notificacion : + estado=None + os.read(s,1) + ip,puerto,download_id,cola_notificacion,socket_notificacion=self.cola_notif.get() + valor=self._suscripciones_socket.setdefault((ip,puerto),[]) # Objeto MUTABLE + + if not len(valor) : # Es el primero + try : + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.setblocking(0) + try: + server.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, 32) + except: + pass + server.bind((ip, puerto)) + server.listen(5) + self.poll.register(server, POLLIN) + self._sockets[server.fileno()]=server + except : + import sys + estado=sys.exc_info()[1] + else : + valor.append(download_id) + self._suscripciones_download_id[download_id]=(cola_notificacion,socket_notificacion) + + cola_notificacion.put(estado) + del estado + os.write(socket_notificacion,"A") + + continue + + s=self._sockets[s].accept()[0] + timeout_original=s.gettimeout() + timeout=time.time()+5 + data="" + while len(data)<48 : + t=timeout-time.time() + if t<=0 : break + s.settimeout(t) + try : + a=s.recv(48) + except : + break + if not a : break + data+=a + if len(data)!=48 : + s.close() + continue + s.settimeout(timeout_original) + download_id=data[28:48] + valor=self._suscripciones_download_id.get(download_id) + if valor : + valor[0].put((s,data)) + import os + os.write(valor[1],"A") + else : + s.close() + # everything below is for testing Index: BitTorrent/download.py =================================================================== RCS file: /opt/src/cvsroot/bittorrent/BitTorrent/download.py,v retrieving revision 1.1.1.13 retrieving revision 1.1.1.13.2.1 diff -c -r1.1.1.13 -r1.1.1.13.2.1 *** BitTorrent/download.py 2004/04/13 02:22:07 1.1.1.13 --- BitTorrent/download.py 2004/04/15 13:18:36 1.1.1.13.2.1 *************** *** 144,149 **** --- 144,150 ---- makedirs(f) info = response['info'] + infohash = sha(bencode(info)).digest() if info.has_key('length'): file_length = info['length'] file = filefunc(info['name'], file_length, config['saveas'], False) *************** *** 234,240 **** e = 'maxport less than minport - no ports to check' for listen_port in xrange(config['minport'], config['maxport'] + 1): try: ! rawserver.bind(listen_port, config['bind']) break except socketerror, e: pass --- 235,241 ---- e = 'maxport less than minport - no ports to check' for listen_port in xrange(config['minport'], config['maxport'] + 1): try: ! rawserver.bind(listen_port, config['bind'], reuse=True, download_id=infohash) break except socketerror, e: pass *************** *** 266,272 **** ratemeasure.data_came_in) connecter = Connecter(make_upload, downloader, choker, len(pieces), upmeasure, config['max_upload_rate'] * 1024, rawserver.add_task) - infohash = sha(bencode(info)).digest() encoder = Encoder(connecter, rawserver, myid, config['max_message_length'], rawserver.add_task, config['keepalive_interval'], infohash, config['max_initiate']) --- 267,272 ---- cvs server: Diffing osx cvs server: Diffing osx/BitTorrent.pbproj cvs server: Diffing osx/Dutch.lproj cvs server: Diffing osx/Dutch.lproj/DLWindow.nib cvs server: Diffing osx/Dutch.lproj/MainMenu.nib cvs server: Diffing osx/Dutch.lproj/Metainfo.nib cvs server: Diffing osx/Dutch.lproj/Preferences.nib cvs server: Diffing osx/English.lproj cvs server: Diffing osx/English.lproj/DLWindow.nib cvs server: Diffing osx/English.lproj/MainMenu.nib cvs server: Diffing osx/English.lproj/Metainfo.nib cvs server: Diffing osx/English.lproj/Preferences.nib cvs server: Diffing osx/French.lproj cvs server: Diffing osx/French.lproj/DLWindow.nib cvs server: Diffing osx/French.lproj/MainMenu.nib cvs server: Diffing osx/French.lproj/Metainfo.nib