source: code/simplexorrequestor.py

Last change on this file was 23, checked in by trishank, 7 years ago

Add pre-release upPIR from June 2011.

File size: 12.5 KB
Line 
1"""
2<Author>
3  Justin Cappos
4
5<Start Date>
6  May 21st, 2011
7
8<Description>
9  A requestor object that selects mirrors randomly.   For example, if you want
10  3 mirror privacy and there are 5 available mirrors, you will download all
11  blocks from 3 randomly selected mirrors.   If one of the mirrors you selected
12  fails, you will use a previously non-selected mirror for the remainder of
13  the blocks.
14
15  For more technical explanation, please see the upPIR papers on my website.
16 
17
18
19"""
20
21
22# I'll use this to XOR the result together
23import simplexordatastore
24
25
26# helper functions that are shared
27import uppirlib
28
29
30# used for locking parallel requests
31import threading
32
33# to sleep...
34import time
35
36# TODO / BUG: Ask Geremy if I should be using os.urandom
37import os
38_randomnumberfunction = os.urandom
39
40# used for mirror selection...
41import random
42
43########################### XORRequestGenerator ###############################
44
45
46def _reconstruct_block(blockinfolist):
47  # private helper to reconstruct a block
48   
49  # xor the blocks together
50  currentresult = blockinfolist[0]['xorblock']
51  for xorblockdict in blockinfolist[1:]:
52    currentresult = simplexordatastore.do_xor(currentresult, xorblockdict['xorblock'])
53
54  # and return the answer
55  return currentresult
56
57
58
59class InsufficientMirrors(Exception):
60  """There are insufficient mirrors to handle your request"""
61
62
63# These provide an easy way for the client XOR request behavior to be
64# modified.   If you wanted to change the policy by which mirrors are selected,
65# the failure behavior for offline mirrors, or the way in which blocks
66# are selected.   
67
68
69class RandomXORRequestor:
70  """
71  <Purpose>
72    Basic XORRequestGenerator that just picks some number of random mirrors
73    and then retrieves all blocks from them.   If any mirror fails or is
74    offline, the operation fails.
75   
76    The strategy this uses is very, very simple.   First we randomly choose
77    $n$ mirrors we want to retrieve blocks from.   If at any point, we have
78    a failure when retrieving a block, we replace that mirror with a
79    mirror we haven't chosen yet.   
80
81  <Side Effects>
82    None.
83
84  <Example Use>
85    >>> rxgobj = RandomXORRequestor(['mirror1','mirror2','mirror3'],
86             [23, 45], { ...# manifest dict omitted # }, 2)
87
88    >>> print rxgobj.get_next_xorrequest()
89    ('mirror3',23, '...')   # bitstring omitted
90    >>> print rxgobj.get_next_xorrequest()
91    ('mirror1',23, '...')   # bitstring omitted
92    >>> print rxgobj.get_next_xorrequest()
93    # this will block because we didn't say either of the others
94    # completed and there are no other mirrors waiting
95
96    >>> rxgobj.notify_success(('mirror1',23,'...'), '...')
97    # the bit string and result were omitted from the previous statement
98    >>> print rxgobj.get_next_xorrequest()
99    ('mirror1',45, '...')   # bitstring omitted
100    >>> rxgobj.notify_success(('mirror3',23, '...'), '...') 
101    >>> print rxgobj.get_next_xorrequest()
102    ('mirror1',45, '...')   # bitstring omitted
103    >>> rxgobj.notify_failure(('mirror1',45, '...'))
104    >>> print rxgobj.get_next_xorrequest()
105    ('mirror2',45, '...')
106    >>> rxgobj.notify_success(('mirror2',45, '...'), '...') 
107    >>> print rxgobj.get_next_xorrequest()
108    ()
109
110  """
111
112
113
114
115  def __init__(self, mirrorinfolist, blocklist, manifestdict, privacythreshold, pollinginterval = .1):
116    """
117    <Purpose>
118      Get ready to handle requests for XOR block strings, etc.
119
120    <Arguments>
121      mirrorinfolist: a list of dictionaries with information about mirrors
122
123      blocklist: the blocks that need to be retrieved
124
125      manifestdict: the manifest with information about the release
126
127      privacythreshold: the number of mirrors that would need to collude to
128                       break privacy
129
130      pollinginterval: the amount of time to sleep between checking for
131                       the ability to serve a mirror.   
132
133    <Exceptions>
134      TypeError may be raised if invalid parameters are given.
135
136      InsufficientMirrors if there are not enough mirrors
137
138    """
139    self.blocklist = blocklist
140    self.manifestdict = manifestdict
141    self.privacythreshold = privacythreshold
142    self.pollinginterval = pollinginterval
143
144    if len(mirrorinfolist) < self.privacythreshold:
145      raise InsufficientMirrors("Requested the use of "+str(self.privacythreshold)+" mirrors, but only "+str(len(mirrorinfolist))+" were available.")
146
147    # now we do the 'random' part.   I copy the mirrorinfolist to avoid changing
148    # the list in place.
149    self.fullmirrorinfolist = mirrorinfolist[:]
150    random.shuffle(self.fullmirrorinfolist)
151
152
153    # let's make a list of mirror information (what has been retrieved, etc.)
154    self.activemirrorinfolist = []
155    for mirrorinfo in self.fullmirrorinfolist[:self.privacythreshold]:
156      thisrequestinfo = {}
157      thisrequestinfo['mirrorinfo'] = mirrorinfo
158      thisrequestinfo['servingrequest'] = False
159      thisrequestinfo['blocksneeded'] = blocklist[:]
160      thisrequestinfo['blockbitstringlist'] = []
161 
162      self.activemirrorinfolist.append(thisrequestinfo)
163     
164
165    bitstringlength = uppirlib.compute_bitstring_length(manifestdict['blockcount'])
166    # let's generate the bitstrings
167    for thisrequestinfo in self.activemirrorinfolist[:-1]:
168
169      for block in blocklist:
170        # I'll generate random bitstrings for N-1 of the mirrors...
171        thisrequestinfo['blockbitstringlist'].append(_randomnumberfunction(bitstringlength))
172
173    # now, let's do the 'derived' ones...
174    for blocknum in range(len(blocklist)):
175      thisbitstring = '\0'*bitstringlength
176     
177      # xor the random strings together
178      for requestinfo in self.activemirrorinfolist[:-1]:
179        thisbitstring = simplexordatastore.do_xor(thisbitstring, requestinfo['blockbitstringlist'][blocknum])
180   
181      # ...and flip the appropriate bit for the block we want
182      thisbitstring = uppirlib.flip_bitstring_bit(thisbitstring, blocklist[blocknum])
183      self.activemirrorinfolist[-1]['blockbitstringlist'].append(thisbitstring)
184   
185    # we're done setting up the bitstrings!
186
187
188    # want to have a structure for locking
189    self.tablelock = threading.Lock()
190   
191     
192     
193
194    # and we'll keep track of the ones that are waiting in the wings...
195    self.backupmirrorinfolist = self.fullmirrorinfolist[self.privacythreshold:]
196
197    # the returned blocks are put here...
198    self.returnedxorblocksdict = {}
199    for blocknum in blocklist:
200      # make these all empty lists to start with
201      self.returnedxorblocksdict[blocknum] = []
202   
203    # and here is where they are put when reconstructed
204    self.finishedblockdict = {}
205
206    # and we're ready!
207
208
209
210
211  def get_next_xorrequest(self):
212    """
213    <Purpose>
214      Gets the next requesttuple that should be returned
215
216    <Arguments>
217      None
218
219    <Exceptions>
220      InsufficientMirrors if there are not enough mirrors
221 
222    <Returns>
223      Either a requesttuple (mirrorinfo, blocknumber, bitstring) or ()
224      when all strings have been retrieved...
225
226    """
227
228    # Three cases I need to worry about:
229    #   1) nothing that still needs to be requested -> return ()
230    #   2) requests remain, but all mirrors are busy -> block until ready
231    #   3) there is a request ready -> return the tuple
232    #
233
234    # I'll exit via return.   I will loop to sleep while waiting.   
235    # I could use a condition variable here, but this should be fine.   There
236    # should almost always be < 5 threads.   Also, why would we start more
237    # threads than there are mirrors we will contact?   (As such, sleeping
238    # should only happen at the very end)
239    while True:
240      # lock the table...
241      self.tablelock.acquire()
242
243      # but always release it
244      try:
245        stillserving = False
246        for requestinfo in self.activemirrorinfolist:
247 
248          # if this mirror is serving a request, skip it...
249          if requestinfo['servingrequest']:
250            stillserving = True
251            continue
252       
253          # this mirror is done...
254          if len(requestinfo['blocksneeded']) == 0:
255            continue
256     
257          # otherwise set it to be taken...
258          requestinfo['servingrequest'] = True
259          return (requestinfo['mirrorinfo'], requestinfo['blocksneeded'][0], requestinfo['blockbitstringlist'][0])
260
261        if not stillserving:
262          return ()
263
264      finally:
265        # I always want someone else to be able to get the lock
266        self.tablelock.release()
267
268      # otherwise, I've looked an nothing is ready...   I'll sleep and retry
269      time.sleep(self.pollinginterval)
270   
271
272
273
274
275  def notify_failure(self, xorrequesttuple):
276    """
277    <Purpose>
278      Handles that a mirror has failed
279
280    <Arguments>
281      The XORrequesttuple that was returned by get_next_xorrequest
282
283    <Exceptions>
284      InsufficientMirrors if there are not enough mirrors
285
286      An internal error is raised if the XORrequesttuple is bogus
287 
288    <Returns>
289      None
290
291    """
292    # I should lock the table...
293    self.tablelock.acquire()
294
295    # but *always* release it
296    try:
297      # if we're out of replacements, quit
298      if len(self.backupmirrorinfolist) == 0:
299        raise InsufficientMirrors("There are no replacement mirrors")
300
301      nextmirrorinfo = self.backupmirrorinfolist.pop(0)
302   
303      failedmirrorsinfo = xorrequesttuple[0]
304   
305      # now, let's find the activemirror this corresponds ro.
306      for activemirrorinfo in self.activemirrorinfolist:
307        if activemirrorinfo['mirrorinfo'] == failedmirrorsinfo:
308     
309          # let's mark it as inactive and set up a different mirror
310          activemirrorinfo['mirrorinfo'] = nextmirrorinfo
311          activemirrorinfo['servingrequest'] = False
312          return
313
314      raise Exception("InternalError: Unknown mirror in notify_failure")
315
316    finally:
317      # release the lock
318      self.tablelock.release()
319   
320
321
322
323  def notify_success(self, xorrequesttuple, xorblock):
324    """
325    <Purpose>
326      Handles the receipt of an xorblock
327
328    <Arguments>
329      xorrequesttuple: The tuple that was returned by get_next_xorrequest
330
331      xorblock: the data returned by the mirror
332
333    <Exceptions>
334      Assertions / IndexError / TypeError / InternalError if the
335      XORrequesttuple is bogus
336 
337    <Returns>
338      None
339
340    """
341
342    # acquire the lock...
343    self.tablelock.acquire()
344    #... but always release it
345    try:
346      thismirrorsinfo = xorrequesttuple[0]
347   
348      # now, let's find the activemirror this corresponds ro.
349      for activemirrorinfo in self.activemirrorinfolist:
350        if activemirrorinfo['mirrorinfo'] == thismirrorsinfo:
351       
352          # let's mark it as inactive and pop off the blocks, etc.
353          activemirrorinfo['servingrequest'] = False
354         
355          # remove the block and bitstring (asserting they match what we said
356          # before)
357          blocknumber = activemirrorinfo['blocksneeded'].pop(0)
358          bitstring = activemirrorinfo['blockbitstringlist'].pop(0)
359          assert(blocknumber == xorrequesttuple[1])
360          assert(bitstring == xorrequesttuple[2])
361 
362          # add the xorblockinfo to the dict
363          xorblockdict = {}
364          xorblockdict['bitstring'] = bitstring
365          xorblockdict['mirrorinfo'] = thismirrorsinfo
366          xorblockdict['xorblock'] = xorblock
367          self.returnedxorblocksdict[blocknumber].append(xorblockdict)
368
369          # if we don't have all of the pieces, continue
370          if len(self.returnedxorblocksdict[blocknumber]) != self.privacythreshold:
371            return
372
373          # if we have all of the pieces, reconstruct it
374          resultingblock = _reconstruct_block(self.returnedxorblocksdict[blocknumber])
375
376          # let's check the hash...
377          resultingblockhash = uppirlib.find_hash(resultingblock, self.manifestdict['hashalgorithm'])
378          if resultingblockhash != self.manifestdict['blockhashlist'][blocknumber]:
379            # TODO: We should notify the vendor!
380            raise Exception('Should notify vendor that one of the mirrors or manifest is corrupt')
381
382          # otherwise, let's put this in the finishedblockdict
383          self.finishedblockdict[blocknumber] = resultingblock
384         
385          # it should be safe to delete this
386          del self.returnedxorblocksdict[blocknumber]
387
388          return
389 
390      raise Exception("InternalError: Unknown mirror in notify_failure")
391
392    finally:
393      # release the lock
394      self.tablelock.release()
395
396
397   
398
399   
400  def return_block(self, blocknum):
401    """
402    <Purpose>
403      Delivers a block.  This presumes there is sufficient cached xorblock info
404
405    <Arguments>
406      blocknum: the block number to return
407
408    <Exceptions>
409      KeyError if the block isn't known
410 
411    <Returns>
412      The block
413
414    """
415    return self.finishedblockdict[blocknum]
416   
417   
Note: See TracBrowser for help on using the repository browser.