source: code/uppir_vendor.py @ 133

Last change on this file since 133 was 23, checked in by trishank, 7 years ago

Add pre-release upPIR from June 2011.

File size: 11.8 KB
Line 
1"""
2<Author>
3  Justin Cappos
4  (inspired from a previous version by Geremy Condra)
5
6<Start Date>
7  May 15th, 2011
8
9<Description>
10  Vendor code for upPIR.   The vendor serves the manifest and mirror list.
11  Thus it acts as a way for mirrors to advertise that they are alive and
12  for clients to find living mirrors.   
13
14  A later version will support client notifications of cheating.
15
16  For more technical explanation, please see the upPIR papers on my website.
17 
18
19<Usage>
20  $ python uppir_vendor.py
21
22
23<Options>
24
25  See Below
26
27"""
28
29# This file is laid out in three main parts.   First, there are helper routines
30# that manage the addition and expiration of mirrorlist content.   Following
31# this are the server routines that handle communications with the clients
32# or mirrors.   The final part contains the argument parsing and main
33# function.   To understand the code, it is recommended one starts at main
34# and reads from there.
35#
36# EXTENSION POINTS:
37#
38# To handle malicious mirrors, the client and vendor will need to have
39# support for malicious block reporting.   This change will be primarily
40# in the server portion although, the mirror would also need to include
41# a way to blacklist offending mirrors to prevent them from re-registering
42
43
44
45
46import sys
47
48import optparse
49
50# helper functions that are shared
51import uppirlib
52
53import optparse
54
55
56# Check the python version
57if sys.version_info[0] != 2 or sys.version_info[1] < 5:
58  print "Requires Python >= 2.5 and < 3.0"
59  sys.exit(1)
60
61# get JSON
62if sys.version_info[1] == 5:
63  try:
64    import simplejson as json
65  except ImportError:
66    # This may have plausibly been forgotten
67    print "Requires simplejson on Python 2.5.X"
68    sys.exit(1)
69else:
70  # This really should be there.   Let's ignore the try-except block...
71  import json
72
73
74# This is used to communicate with clients with a message like abstraction
75import session
76
77# used to get a lock
78import threading
79
80
81# to handle upPIR protocol requests
82import SocketServer
83
84# to run in the background...
85import daemon
86
87
88# for logging purposes...
89import time
90import traceback
91
92_logfo=None
93
94def _log(stringtolog):
95  # helper function to log data
96  _logfo.write(str(time.time()) +" "+stringtolog+"\n")
97  _logfo.flush()
98
99
100
101# JAC: I don't normally like to use Python's socket servers because of the lack
102#      of control but I'll give it a try this time.   Passing arguments to
103#      requesthandlers is a PITA.   I'll use a messy global instead
104_global_rawmanifestdata = None
105_global_rawmirrorlist = None
106
107# These are more defensible.   
108_global_mirrorinfodict = {}
109_global_mirrorinfolock = threading.Lock()
110
111
112########################### Mirrorlist manipulation ##########################
113import time
114
115
116
117def _check_for_expired_mirrorinfo():
118  # Private function to check to see if mirrors are expired...
119
120  # I'll be updating this
121  global _global_rawmirrorlist
122
123  # No need to block and wait for this to happen if there are multiple of these
124  if _global_mirrorinfolock.acquire(False):
125
126    # always release the lock...
127    try:
128      now = time.time()
129      # walk through the mirrors and remove any that are over time...
130      for mirrorip in _global_mirrorinfodict:
131   
132        # if it's expired, remove the entry...
133        if now > _commandlineoptions.mirrorexpirytime + _global_mirrorinfodict[mirrorip]['advertisetime']:
134          del _global_mirrorinfodict[mirrorip]
135   
136      mirrorlist = []
137      # now let's rebuild the mirrorlist
138      for mirrorip in _global_mirrorinfodict:
139        mirrorlist.append(_global_mirrorinfodict[mirrorip]['mirrorinfo'])
140
141      # and replace the global
142      _global_rawmirrorlist = json.dumps(mirrorlist)
143
144    finally:
145      # always release
146      _global_mirrorinfolock.release()
147
148
149
150
151def _add_mirrorinfo_to_list(thismirrorinfo):
152  # Private function to add mirror information
153
154  # add mirror information along with the time
155  mirrorip = thismirrorinfo['ip']
156
157  # get the lock and add it to the dict
158  _global_mirrorinfolock.acquire()
159  try:
160    # I get the time in here, in case I block for a noticible time waiting for
161    # the lock
162    now = time.time()
163    _global_mirrorinfodict[mirrorip] = {'mirrorinfo':thismirrorinfo, 'advertisetime':now}
164
165  finally:
166    _global_mirrorinfolock.release()
167 
168
169 
170
171
172######################### Serve upPIR vendor requests ########################
173
174
175# I don't need to change this much, I think...
176class ThreadedVendorServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
177  allow_reuse_address=True
178
179
180class ThreadedVendorRequestHandler(SocketServer.BaseRequestHandler):
181
182  def handle(self):
183
184    # read the request from the socket...
185    requeststring = session.recvmessage(self.request)
186
187    # for logging purposes, get the remote info
188    remoteip, remoteport = self.request.getpeername()
189
190    # if it's a request for a XORBLOCK
191    if requeststring == 'GET MANIFEST':
192
193      session.sendmessage(self.request, _global_rawmanifestdata)
194      _log("UPPIRVendor "+remoteip+" "+str(remoteport)+" manifest request")
195 
196      # done!
197      return
198
199    elif requeststring == 'GET MIRRORLIST':
200      # let's try to clean up the list.   If we are busy with another attempt
201      # to do this, the latter will be a NOOP
202      _check_for_expired_mirrorinfo()
203
204      # reply with the mirror list
205      session.sendmessage(self.request, _global_rawmirrorlist)
206      _log("UPPIRVendor "+remoteip+" "+str(remoteport)+" mirrorlist request")
207
208      # done!
209      return
210
211    elif requeststring.startswith('MIRRORADVERTISE'):
212      # This is a mirror telling us it's ready to serve clients.
213
214      mirrorrawdata = requeststring[len('MIRRORADVERTISE'):]
215     
216      # handle the case where the mirror provides data that is larger than
217      # we want to serve
218      if len(mirrorrawdata) > _commandlineoptions.maxmirrorinfo:
219        session.sendmessage(self.request, "Error, mirrorinfo too large!")
220        _log("UPPIRVendor "+remoteip+" "+str(remoteport)+" mirrorinfo too large: "+str(len(mirrorrawdata)))
221        return
222
223      # Let's sanity check the data...
224      # can we deserialize it?
225      try:
226        mirrorinfodict = json.loads(mirrorrawdata)
227      except (TypeError, ValueError), e:
228        session.sendmessage(self.request, "Error cannot deserialize mirrorinfo!")
229        _log("UPPIRVendor "+remoteip+" "+str(remoteport)+" cannot deserialize mirrorinfo!"+str(e))
230        return
231
232      # is it a dictionary and does it have the required keys?
233      if type(mirrorinfodict) != dict or 'ip' not in mirrorinfodict or 'port' not in mirrorinfodict:
234        session.sendmessage(self.request, "Error, mirrorinfo has an invalid format.")
235        _log("UPPIRVendor "+remoteip+" "+str(remoteport)+" mirrorinfo has an invalid format")
236        return
237     
238      # is it a dictionary and does it have the required keys?
239      if mirrorinfodict['ip'] != remoteip:
240        session.sendmessage(self.request, "Error, must provide mirrorinfo from the mirror's IP")
241        _log("UPPIRVendor "+remoteip+" "+str(remoteport)+" mirrorinfo provided from the wrong IP")
242        return
243     
244      # add the information to the mirrorlist
245      _add_mirrorinfo_to_list(mirrorinfodict)
246
247      # and notify the user
248      session.sendmessage(self.request, 'OK')
249      _log("UPPIRVendor "+remoteip+" "+str(remoteport)+" mirrorinfo update "+str(len(mirrorrawdata)))
250
251      # done!
252      return
253
254   # add HELLO
255    elif requeststring == 'HELLO':
256      # send a reply.
257      session.sendmessage(self.request, "VENDORHI!")
258      _log("UPPIRVendor "+remoteip+" "+str(remoteport)+" VENDORHI!")
259
260      # done!
261      return
262
263    else:
264      # we don't know what this is!   Log and tell the requestor
265      _log("UPPIRVendor "+remoteip+" "+str(remoteport)+" Invalid request type starts:'"+requeststring[:5]+"'")
266
267      session.sendmessage(self.request, 'Invalid request type')
268      return
269
270
271
272
273
274def start_vendor_service(manifestdict, ip, port):
275
276  # this should be done before we are called
277  assert(_global_rawmanifestdata != None)
278
279  # create the handler / server
280  vendorserver = ThreadedVendorServer((ip, port), ThreadedVendorRequestHandler)
281 
282
283  # and serve forever!   This call will not return which is why we spawn a new
284  # thread to handle it
285  threading.Thread(target=vendorserver.serve_forever, name="upPIR vendor server").start()
286
287
288
289
290
291########################### Option parsing and main ###########################
292_commandlineoptions = None
293
294def parse_options():
295  """
296  <Purpose>
297    Parses command line arguments.
298
299  <Arguments>
300    None
301 
302  <Side Effects>
303    All relevant data is added to _commandlineoptions
304
305  <Exceptions>
306    These are handled by optparse internally.   I believe it will print / exit
307    itself without raising exceptions further.   I do print an error and
308    exit if there are extra args...
309
310  <Returns>
311    None
312  """
313  global _commandlineoptions
314  global _logfo
315
316  # should be true unless we're initing twice...
317  assert(_commandlineoptions==None)
318
319  parser = optparse.OptionParser()
320
321  # I should use these from the manifest, not the command line...
322#  parser.add_option("","--ip", dest="ip", type="string", metavar="IP",
323#        default="0.0.0.0", help="Listen for clients on the following IP")
324
325#  parser.add_option("","--port", dest="port", type="int", metavar="portnum",
326#        default=62293, help="Run the vendor on the following port (default 62293)")
327
328  parser.add_option("","--manifestfile", dest="manifestfilename",
329        type="string", default="manifest.dat",
330        help="The manifest file to use (default manifest.dat).")
331
332  parser.add_option("","--foreground", dest="daemonize", action="store_false",
333        default=True,
334        help="Do not detach from the terminal and run in the background")
335
336  parser.add_option("","--logfile", dest="logfilename",
337        type="string", default="vendor.log",
338        help="The file to write log data to (default vendor.log).")
339
340  parser.add_option("","--maxmirrorinfo", dest="maxmirrorinfo",
341        type="int", default=10240,
342        help="The maximum amount of serialized data a mirror can add to the mirror list (default 10K)")
343
344  parser.add_option("","--mirrorexpirytime", dest="mirrorexpirytime",
345        type="int", default=300,
346        help="The number of seconds of inactivity before expiring a mirror (default 300).")
347
348
349
350  # let's parse the args
351  (_commandlineoptions, remainingargs) = parser.parse_args()
352
353
354  # check the maxmirrorinfo
355  if _commandlineoptions.maxmirrorinfo <=0:
356    print "Max mirror info size must be positive"
357    sys.exit(1)
358
359
360  if remainingargs:
361    print "Unknown options",remainingargs
362    sys.exit(1)
363
364  # try to open the log file...
365  _logfo = open(_commandlineoptions.logfilename, 'a')
366
367
368
369def main():
370  global _global_rawmanifestdata
371  global _global_rawmirrorlist
372
373 
374  # read in the manifest file
375  rawmanifestdata = open(_commandlineoptions.manifestfilename).read()
376
377  # an ugly hack, but Python's request handlers don't have an easy way to
378  # pass arguments
379  _global_rawmanifestdata = rawmanifestdata
380  _global_rawmirrorlist = json.dumps([])
381
382  # I do this just for the sanity / corruption check
383  manifestdict = uppirlib.parse_manifest(rawmanifestdata)
384
385  vendorip = manifestdict['vendorhostname']
386  vendorport = manifestdict['vendorport']
387 
388  # We should detach here.   I don't do it earlier so that error
389  # messages are written to the terminal...   I don't do it later so that any
390  # threads don't exist already.   If I do put it much later, the code hangs...
391  if _commandlineoptions.daemonize:
392    daemon.daemonize()
393
394  # we're now ready to handle clients!
395  _log('ready to start servers!')
396
397 
398  # first, let's fire up the upPIR server
399  start_vendor_service(manifestdict, vendorip, vendorport)
400
401
402  _log('servers started!')
403
404
405
406if __name__ == '__main__':
407  parse_options()
408  try:
409    main()
410  except Exception, e:
411    # log errors to prevent silent exiting...   
412    print(str(type(e))+" "+str(e))
413    # this mess prints a not-so-nice traceback, but it does contain all
414    # relevant info
415    _log(str(traceback.format_tb(sys.exc_info()[2])))
416    sys.exit(1)
417
Note: See TracBrowser for help on using the repository browser.