source: code/uppir_mirror.py @ 23

Last change on this file since 23 was 23, checked in by trishank, 7 years ago

Add pre-release upPIR from June 2011.

File size: 12.2 KB
Line 
1"""
2<Author>
3  Justin Cappos
4  (inspired from a previous version by Geremy Condra)
5
6<Start Date>
7  May 15th, 2011
8
9<Description>
10  Mirror code that serves upPIR files.   A client obtains a list of live
11  mirrors from a vendor.   The client then sends bit strings to the mirror and
12  the mirror returns XORed blocks of data.   The mirror will periodically
13  announce its liveness to the vendor it is serving content for.   
14
15  The files specified in the manifest must already exist on the local machine.
16
17  For more technical explanation, please see the upPIR papers on my website.
18 
19
20<Usage>
21  $ python uppir_mirror.py
22
23  $ python uppir_mirror.py --help
24
25<Options>
26  See below...
27"""
28
29
30# This file is laid out in four main parts.   First, there are some helper
31# functions to advertise the mirror with the vendor.   The second section
32# includes the functionality to serve content via upPIR.   The third section
33# serves data via HTTP.   The final part contains the option
34# parsing and main.   To get an overall feel for the code, it is recommended
35# to follow the execution from main on.
36#
37# EXTENSION POINTS:
38#
39# One can define new xordatastore types (although I need a more explicit plugin
40# module).   These could include memoization and other optimizations to
41# further improve the speed of XOR processing.
42#
43
44
45
46
47import sys
48
49import optparse
50
51# Holds the mirror data and produces the XORed blocks
52import fastsimplexordatastore
53
54# helper functions that are shared
55import uppirlib
56
57import optparse
58
59# TODO: consider FTP via pyftpdlib (?)
60
61# This is used to communicate with clients with a message like abstraction
62import session
63
64# used to start the UPPIR / HTTP servers in parallel
65import threading
66
67import getmyip
68
69
70# to handle upPIR protocol requests
71import SocketServer
72
73# to run in the background...
74import daemon
75
76
77# for logging purposes...
78import time
79import traceback
80
81_logfo=None
82
83def _log(stringtolog):
84  # helper function to log data
85  _logfo.write(str(time.time()) +" "+stringtolog+"\n")
86  _logfo.flush()
87
88
89
90# JAC: I don't normally like to use Python's socket servers because of the lack
91#      of control but I'll give it a try this time.   Passing arguments to
92#      requesthandlers is a PITA.   I'll use a messy global instead
93_global_myxordatastore = None
94_global_manifestdict = None
95
96
97#################### Advertising ourself with the vendor ######################
98
99def _send_mirrorinfo():
100  # private function that sends our mirrorinfo to the vendor
101
102
103  # adding more information here  is a natural way to extend the mirror /
104  # client.   The vendor should not need to be changed
105  # at a minimum, the 'ip' and 'port' are required to provide the client with
106  # information about how to contact the mirror.
107  mymirrorinfo = {'ip':_commandlineoptions.ip, 'port':_commandlineoptions.port}
108
109  uppirlib.transmit_mirrorinfo(mymirrorinfo, _global_manifestdict['vendorhostname'], _global_manifestdict['vendorport'])
110 
111
112
113
114############################### Serve via upPIR ###############################
115
116
117# I don't need to change this much, I think...
118class ThreadedXORServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
119  allow_reuse_address=True
120
121
122class ThreadedXORRequestHandler(SocketServer.BaseRequestHandler):
123
124  def handle(self):
125
126    # read the request from the socket...
127    requeststring = session.recvmessage(self.request)
128
129    # for logging purposes, get the remote info
130    remoteip, remoteport = self.request.getpeername()
131
132    # if it's a request for a XORBLOCK
133    if requeststring.startswith('XORBLOCK'):
134
135      bitstring = requeststring[len('XORBLOCK'):]
136 
137      expectedbitstringlength = uppirlib.compute_bitstring_length(_global_myxordatastore.numberofblocks)
138
139      if len(bitstring) != expectedbitstringlength:
140        # Invalid request length...
141        _log("UPPIR "+remoteip+" "+str(remoteport)+" Invalid request with length: "+str(len(bitstring)))
142
143        session.sendmessage(self.request, 'Invalid request length')
144        return
145 
146      # Now let's process this...
147      xoranswer = _global_myxordatastore.produce_xor_from_bitstring(bitstring)
148
149      # and send the reply.
150      session.sendmessage(self.request, xoranswer)
151      _log("UPPIR "+remoteip+" "+str(remoteport)+" GOOD")
152
153      # done!
154      return
155
156    elif requeststring == 'HELLO':
157      # send a reply.
158      session.sendmessage(self.request, "HI!")
159      _log("UPPIR "+remoteip+" "+str(remoteport)+" HI!")
160
161      # done!
162      return
163
164    else:
165      # we don't know what this is!   Log and tell the requestor
166      _log("UPPIR "+remoteip+" "+str(remoteport)+" Invalid request type starts:'"+requeststring[:5]+"'")
167
168      session.sendmessage(self.request, 'Invalid request type')
169      return
170
171
172
173
174
175def service_uppir_clients(myxordatastore, ip, port):
176
177  # this should be done before we are called
178  assert(_global_myxordatastore != None)
179
180  # create the handler / server
181  xorserver = ThreadedXORServer((ip, port), ThreadedXORRequestHandler)
182 
183
184  # and serve forever!   This call will not return which is why we spawn a new
185  # thread to handle it
186  threading.Thread(target=xorserver.serve_forever, name="upPIR mirror server").start()
187
188
189
190################################ Serve via HTTP ###############################
191
192import BaseHTTPServer
193
194import urlparse
195
196# handle a HTTP request
197class MyHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
198
199  def do_GET(self):
200
201    # get the path part of the request.   Ignore the host name, etc.
202    requestedfilename = urlparse.urlparse(self.path).path
203
204    # if there is a leading '/' then kill it.
205    if requestedfilename.startswith('/'):
206      requestedfilename = requestedfilename[1:]
207
208    # let's look for the file...
209    for fileinfo in _global_manifestdict['fileinfolist']:
210      # great, let's serve it!
211      if requestedfilename == fileinfo['filename']:
212        # it's a good query!   Send 200!
213        self.send_response(200)
214        self.end_headers()
215
216        # and send the response!
217        filedata = _global_myxordatastore.get_data(fileinfo['offset'],fileinfo['length'])
218        self.wfile.write(filedata)
219        return
220
221    # otherwise, it's unknown...
222    self.send_error(404)
223    return
224   
225  # log HTTP information
226  def log_message(self,format, *args):
227    _log("HTTP "+self.client_address[0]+" "+str(self.client_address[1])+" "+(format % args))
228
229
230
231
232def service_http_clients(myxordatastore, manifestdict, ip, port):
233  # time to serve HTTP clients...
234 
235  # this must have already been set
236  assert(_global_myxordatastore != None)
237  assert(_global_manifestdict != None)
238
239  httpserver = BaseHTTPServer.HTTPServer((ip, port), MyHTTPRequestHandler)
240
241  # and serve forever!   Just like with upPIR, this doesn't return so we need a
242  # new thread...
243  threading.Thread(target=httpserver.serve_forever, name="HTTP server").start()
244
245
246
247
248
249########################### Option parsing and main ###########################
250_commandlineoptions = None
251
252def parse_options():
253  """
254  <Purpose>
255    Parses command line arguments.
256
257  <Arguments>
258    None
259 
260  <Side Effects>
261    All relevant data is added to _commandlineoptions
262
263  <Exceptions>
264    These are handled by optparse internally.   I believe it will print / exit
265    itself without raising exceptions further.   I do print an error and
266    exit if there are extra args...
267
268  <Returns>
269    None
270  """
271  global _commandlineoptions
272  global _logfo
273
274  # should be true unless we're initing twice...
275  assert(_commandlineoptions==None)
276
277  parser = optparse.OptionParser()
278
279  parser.add_option("","--ip", dest="ip", type="string", metavar="IP",
280        default=getmyip.getmyip(), help="Listen for clients on the following IP (default is the public facing IP)")
281
282  parser.add_option("","--port", dest="port", type="int", metavar="portnum",
283        default=62294, help="Use the following port to serve upPIR clients (default 62294)")
284
285  parser.add_option("","--http", dest="http", action="store_true",
286        default=False, help="Serve legacy clients via HTTP (default False)")
287
288  parser.add_option("","--httpport", dest="httpport", type="int",
289        default=80, help="Serve HTTP clients on this port (default 80)")
290
291  parser.add_option("","--mirrorroot", dest="mirrorroot", type="string",
292        metavar="dir", default=".",
293        help="The base directory where all mirror files live under")
294
295  parser.add_option("","--retrievemanifestfrom", dest="retrievemanifestfrom",
296        type="string", metavar="vendorIP:port", default="",
297        help="Specifies the vendor to retrieve the manifest from (default None).")
298
299  parser.add_option("","--manifestfile", dest="manifestfilename",
300        type="string", default="manifest.dat",
301        help="The manifest file to use (default manifest.dat).")
302
303  parser.add_option("","--foreground", dest="daemonize", action="store_false",
304        default=True,
305        help="Do not detach from the terminal and run in the background")
306
307  parser.add_option("","--logfile", dest="logfilename",
308        type="string", default="mirror.log",
309        help="The file to write log data to (default mirror.log).")
310
311  parser.add_option("","--announcedelay", dest="mirrorlistadvertisedelay",
312        type="int", default=60,
313        help="How many seconds should I wait between vendor notifications? (default 60).")
314
315
316  # let's parse the args
317  (_commandlineoptions, remainingargs) = parser.parse_args()
318
319
320  # check the arguments
321  if _commandlineoptions.port <=0 or _commandlineoptions.port >65535:
322    print "Specified port number out of range"
323    sys.exit(1)
324
325  if _commandlineoptions.httpport <=0 or _commandlineoptions.httpport >65535:
326    print "Specified HTTP port number out of range"
327    sys.exit(1)
328
329  if _commandlineoptions.mirrorlistadvertisedelay < 0:
330    print "Mirror advertise delay must be positive"
331    sys.exit(1)
332
333  if remainingargs:
334    print "Unknown options",remainingargs
335    sys.exit(1)
336
337  # try to open the log file...
338  _logfo = open(_commandlineoptions.logfilename, 'a')
339
340
341def main():
342  global _global_myxordatastore
343  global _global_manifestdict
344
345 
346  # If we were asked to retrieve the mainfest file, do so...
347  if _commandlineoptions.retrievemanifestfrom:
348    # We need to download this file...
349    rawmanifestdata = uppirlib.retrieve_rawmanifest(_commandlineoptions.retrievemanifestfrom)
350
351    # ...make sure it is valid...
352    manifestdict = uppirlib.parse_manifest(rawmanifestdata)
353   
354    # ...and write it out if it's okay
355    open(_commandlineoptions.manifestfilename, "w").write(rawmanifestdata)
356
357
358  else:
359    # Simply read it in from disk
360
361    rawmanifestdata = open(_commandlineoptions.manifestfilename).read()
362
363    manifestdict = uppirlib.parse_manifest(rawmanifestdata)
364 
365  # We should detach here.   I don't do it earlier so that error
366  # messages are written to the terminal...   I don't do it later so that any
367  # threads don't exist already.   If I do put it much later, the code hangs...
368  if _commandlineoptions.daemonize:
369    daemon.daemonize()
370
371
372
373  myxordatastore = fastsimplexordatastore.XORDatastore(manifestdict['blocksize'], manifestdict['blockcount'])
374
375  # now let's put the content in the datastore in preparation to serve it
376  uppirlib.populate_xordatastore(manifestdict, myxordatastore, rootdir = _commandlineoptions.mirrorroot)
377   
378  # we're now ready to handle clients!
379  _log('ready to start servers!')
380
381  # an ugly hack, but Python's request handlers don't have an easy way to
382  # pass arguments
383  _global_myxordatastore = myxordatastore
384  _global_manifestdict = manifestdict
385 
386  # first, let's fire up the upPIR server
387  service_uppir_clients(myxordatastore, _commandlineoptions.ip, _commandlineoptions.port)
388
389  # If I should serve legacy clients via HTTP, let's start that up...
390  if _commandlineoptions.http:
391    service_http_clients(myxordatastore, manifestdict, _commandlineoptions.ip, _commandlineoptions.httpport)
392
393  _log('servers started!')
394
395  # let's send the mirror information periodically...
396  # we should log any errors...
397  while True:
398    try:
399      _send_mirrorinfo()
400    except Exception, e:
401      _log(str(e)+"\n"+str(traceback.format_tb(sys.exc_info()[2])))
402
403    time.sleep(_commandlineoptions.mirrorlistadvertisedelay)
404
405
406
407if __name__ == '__main__':
408  parse_options()
409  try:
410    main()
411  except Exception, e:
412    # log errors to prevent silent exiting...   
413    print(str(type(e))+" "+str(e))
414    # this mess prints a not-so-nice traceback, but it does contain all
415    # relevant info
416    _log(str(e)+"\n"+str(traceback.format_tb(sys.exc_info()[2])))
417    sys.exit(1)
418
Note: See TracBrowser for help on using the repository browser.