# -*- Mode: Python; tab-width: 4 -*- # $Id: asynchat.py,v 1.1 1999/01/12 20:19:24 guido Exp $ # Author: Sam Rushing # ====================================================================== # Copyright 1996 by Sam Rushing # # All Rights Reserved # # Permission to use, copy, modify, and distribute this software and # its documentation for any purpose and without fee is hereby # granted, provided that the above copyright notice appear in all # copies and that both that copyright notice and this permission # notice appear in supporting documentation, and that the name of Sam # Rushing not be used in advertising or publicity pertaining to # distribution of the software without specific, written prior # permission. # # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # ====================================================================== import socket import asyncore import string # This class adds support for 'chat' style protocols - where one side # sends a 'command', and the other sends a response (examples would be # the common internet protocols - smtp, nntp, ftp, etc..). # The handle_read() method looks at the input stream for the current # 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' # for multi-line output), calling self.found_terminator() on its # receipt. # for example: # Say you build an async nntp client using this class. At the start # of the connection, you'll have self.terminator set to '\r\n', in # order to process the single-line greeting. Just before issuing a # 'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST # command will be accumulated (using your own 'collect_incoming_data' # method) up to the terminator, and then control will be returned to # you - by calling your self.found_terminator() method class async_chat (asyncore.dispatcher): """This is an abstract class. You must derive from this class, and add the two methods collect_incoming_data() and found_terminator()""" # these are overridable defaults ac_in_buffer_size = 4096 ac_out_buffer_size = 4096 def __init__ (self, conn=None): self.ac_in_buffer = '' self.ac_out_buffer = '' self.producer_fifo = fifo() asyncore.dispatcher.__init__ (self, conn) def set_terminator (self, term): "Set the input delimiter. Can be a fixed string of any length, or None" if term is None: self.terminator = '' else: self.terminator = term def get_terminator (self): return self.terminator # grab some more data from the socket, # throw it to the collector method, # check for the terminator, # if found, transition to the next state. def handle_read (self): try: data = self.recv (self.ac_in_buffer_size) except socket.error, why: import sys self.handle_error (sys.exc_type, sys.exc_value, sys.exc_traceback) return self.ac_in_buffer = self.ac_in_buffer + data # Continue to search for self.terminator in self.ac_in_buffer, # while calling self.collect_incoming_data. The while loop # is necessary because we might read several data+terminator # combos with a single recv(1024). while self.ac_in_buffer: terminator = self.get_terminator() terminator_len = len(terminator) # 4 cases: # 1) end of buffer matches terminator exactly: # collect data, transition # 2) end of buffer matches some prefix: # collect data to the prefix # 3) end of buffer does not match any prefix: # collect data # 4) no terminator, just collect the data if terminator: index = string.find (self.ac_in_buffer, terminator) if index != -1: # we found the terminator self.collect_incoming_data (self.ac_in_buffer[:index]) self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] # This does the Right Thing if the terminator is changed here. self.found_terminator() else: # check for a prefix of the terminator index = find_prefix_at_end (self.ac_in_buffer, terminator) if index: # we found a prefix, collect up to the prefix self.collect_incoming_data (self.ac_in_buffer[:-index]) self.ac_in_buffer = self.ac_in_buffer[-index:] break else: # no prefix, collect it all self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = '' else: # no terminator, collect it all self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = '' def handle_write (self): self.initiate_send () def handle_close (self): self.close() def push (self, data): self.producer_fifo.push (simple_producer (data)) self.initiate_send() def push_with_producer (self, producer): self.producer_fifo.push (producer) self.initiate_send() def readable (self): return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) def writable (self): return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) def close_when_done (self): self.producer_fifo.push (None) # refill the outgoing buffer by calling the more() method # of the first producer in the queue def refill_buffer (self): while 1: if len(self.producer_fifo): p = self.producer_fifo.first() # a 'None' in the producer fifo is a sentinel, # telling us to close the channel. if p is None: if not self.ac_out_buffer: self.producer_fifo.pop() self.close() return data = p.more() if data: self.ac_out_buffer = self.ac_out_buffer + data return else: self.producer_fifo.pop() else: return def initiate_send (self): obs = self.ac_out_buffer_size # try to refill the buffer if (not self._push_mode) and (len (self.ac_out_buffer) < obs): self.refill_buffer() if self.ac_out_buffer and self.connected: # try to send the buffer num_sent = self.send (self.ac_out_buffer[:obs]) if num_sent: self.ac_out_buffer = self.ac_out_buffer[num_sent:] def discard_buffers (self): # Emergencies only! self.ac_in_buffer = '' self.ac_out_buffer == '' while self.producer_fifo: self.producer_fifo.pop() # ================================================== # support for push mode. # ================================================== _push_mode = 0 def push_mode (self, boolean): self._push_mode = boolean def writable_push (self): return self.connected and len(self.ac_out_buffer) class simple_producer: def __init__ (self, data, buffer_size=512): self.data = data self.buffer_size = buffer_size def more (self): if len (self.data) > self.buffer_size: result = self.data[:self.buffer_size] self.data = self.data[self.buffer_size:] return result else: result = self.data self.data = '' return result class fifo: def __init__ (self, list=None): if not list: self.list = [] else: self.list = list def __len__ (self): return len(self.list) def first (self): return self.list[0] def push (self, data): self.list.append (data) def pop (self): if self.list: result = self.list[0] del self.list[0] return (1, result) else: return (0, None) # Given 'haystack', see if any prefix of 'needle' is at its end. This # assumes an exact match has already been checked. Return the number of # characters matched. # for example: # f_p_a_e ("qwerty\r", "\r\n") => 1 # f_p_a_e ("qwerty\r\n", "\r\n") => 2 # f_p_a_e ("qwertydkjf", "\r\n") => 0 # this could maybe be made faster with a computed regex? ##def find_prefix_at_end (haystack, needle): ## nl = len(needle) ## result = 0 ## for i in range (1,nl): ## if haystack[-(nl-i):] == needle[:(nl-i)]: ## result = nl-i ## break ## return result # yes, this is about twice as fast, but still seems # to be neglible CPU. The previous could do about 290 # searches/sec. the new one about 555/sec. import regex prefix_cache = {} def prefix_regex (needle): if prefix_cache.has_key (needle): return prefix_cache[needle] else: reg = needle[-1] for i in range(1,len(needle)): reg = '%c\(%s\)?' % (needle[-(i+1)], reg) reg = regex.compile (reg+'$') prefix_cache[needle] = reg, len(needle) return reg, len(needle) def find_prefix_at_end (haystack, needle): reg, length = prefix_regex (needle) lh = len(haystack) result = reg.search (haystack, max(0,lh-length)) if result >= 0: return (lh - result) else: return 0