import random import math import heapq # for nlargest from operator import itemgetter def randinst(n): """generate instance x\in\mathbb R^n, with each entry chosen uniformly in [0,1]""" return [random.random() for i in range(n)] def randzoinst(n): """generate instance x\in\mathbb R^n, with each entry chosen uniformly in {1,2}""" return [random.randint(1,2) for i in range(n)] def poissoninst(n,d,lam,noiselvl=0.1): """delta separated peaks plus small noise. lam is 1.0 divided by the desired mean, exclusive d""" inst=[random.random()*noiselvl for i in range(n)] # fill with small noise i=int(round(random.expovariate(lam))) while i=without: # pick better option self.xsol[i][l]=1 # keep track of chosen elements self.xopt[i][l]=max(with_xi,without) # pick value of better option def run(self): for l in range(self.k+1): for i in range(self.n): self.update(i,l) def headval(self,l=-1): """Returns value of best solution. default value: solution may contain up to k elements""" if l==-1: l=self.k return self.opt(self.n-1,l) def tailval(self): return sum(self.x)-self.headval() def solution(self,l=-1): """Returns best solution. default value: solution may contain up to k elements""" if l==-1: l=self.k i=self.n-1 sol=[] while True: # recover solution from xsol if i<0 or l<=0: sol.sort() return sol if self.xsol[i][l]==1: sol.append(i) i=i-self.d l=l-1 else: i=i-1 class TwoSpikesDP: """dynamic programming for two spikes per Delta-interval""" def __init__(self,x,d,k): self.x=x self.d=d self.k=k self.n=len(x) self.xopt=[[[0]*(k+1) for i in range(d)] for r in range(self.n)] # the optimal values OPT([r],i,l) self.xsol=[[[0]*(k+1) for i in range(d)] for r in range(self.n)] # keep track of actual solution self.run() def __str__(self): return "DynProg for two spikes" def opt(self,r,i,l): """look up opt value for OPT([r],i,l)""" if r<0 or l<=0: return 0 if i<=0: return self.xopt[r][1][l] return self.xopt[r][i][l] def update(self,r,i,l): """update xopt according to update recursion formula""" with_xr=self.x[r]+self.opt(r-i,self.d-i,l-1) # option: choose x[r] for solution without=self.opt(r-1,i-1,l) # option: don't choose x[r] if with_xr>=without: # pick better option self.xsol[r][i][l]=1 # keep track of chosen elements self.xopt[r][i][l]=max(with_xr,without) def run(self): for r in range(self.n): for l in range(1,self.k+1): for i in range(1,self.d): self.update(r,i,l) def headval(self,l=-1): """Returns value of best solution. default value: solution may contain up to k elements""" if l==-1: l=self.k return self.opt(self.n-1,0,l) def tailval(self): return sum(self.x)-self.headval() def solution(self,l=-1): """Returns best solution. default value: solution may contain up to k elements""" if l==-1: l=self.k r=self.n-1 i=1 sol=[] while True: # recover solution from xsol # print "r/i/l: {}/{}/{}".format(r,i,l) if r<0 or l<=0: sol.sort() return sol if self.xsol[r][i][l]==1: sol.append(r) r=r-i l=l-1 i=self.d-i else: r=r-1 i=max(1,i-1) class HeadSolver: """head approximation""" def __init__(self,x,d,k,approx_scalar): self.setup(x,d,k,approx_scalar) self.run() def setup(self,x,d,k,approx_scalar): self.x=x self.d=d self.k=k self.a=approx_scalar # approximation guarantee a/(a+1) self.n=len(x) self.exact_solver=DPSolver self.max_per_block=self.a # how many elements can I pick in a block? Double this size for p=2 def __str__(self): return "HeadSolver with lambda="+str(self.a) def run(self): """run for all a+1 slices and then pick best solution""" best_result=-1 self.best_slice=-1 for i in range(self.a+1): value,block_sol,shifts,algos=self.solveslice(i) if value>best_result: best_result=value best_solinfo=(block_sol,shifts,algos) self.best_slice=i # just for interest, could delete self.__headval=best_result self.__solinfo=best_solinfo self.__tailval=sum(self.x)-self.__headval def block_sol_info(self,largest,blocknum): """determine how many elements are picked from each block""" block_sol=[0]*blocknum for (q,i,l) in largest: if q>0 and block_sol[i]1: firstblock=self.x[0:(i-1)*self.d] S=[firstblock] shifts.append(0) else: S=[] otherblocks=[self.x[j:j+self.a*self.d] for j in range(i*self.d,self.n,(self.a+1)*self.d)] S.extend(otherblocks) shifts.extend(range(i*self.d,self.n,(self.a+1)*self.d)) return S,shifts def solveslice(self,i): """generate ith slice and solve optimally""" blocks,shifts=self.slice(i) best_in_blocks=[None]*len(blocks) algos=[None]*len(blocks) for i,block in enumerate(blocks): best_in_blocks[i],algos[i]=self.blocksolve(block) value,block_sol=self.best_k_in_blocks(best_in_blocks) return value,block_sol,shifts,algos # pick best k among all the solutions def headval(self): return self.__headval def tailval(self): return self.__tailval def solution(self): block_sol,shifts,algos=self.__solinfo sol=[] for i in range(len(block_sol)): if block_sol[i]>0: s=algos[i].solution(block_sol[i]) sol.extend([index+shifts[i] for index in s]) return sol class TwoSpikesHeadSolver(HeadSolver): def __init__(self,x,d,k,approx_scalar): self.setup(x,d,k,approx_scalar) self.exact_solver=TwoSpikesDP # use TwoSpikesDP as exact solver on the blocks self.max_per_block=2*self.a # can pick up to 2a elements per block self.run() class TailSolver(HeadSolver): def __init__(self,x,d,k,approx_scalar): self.setup(x,d,k,approx_scalar) self.strong=self.strong_indices() self.run() def __str__(self): return "TailSolver with lambda="+str(self.a) def gap_indicator(self,i): gap_ind=[False]*self.n start= i*self.d gaps = [range(gapstart,min(gapstart+self.d,self.n)) for gapstart in range(start, self.n,(self.a+1)*self.d)] stronggaps = [range(max(s-self.d+1,0),min(s+self.d,self.n)) for s in self.strong] gaps.extend(stronggaps) for gap in gaps: for j in gap: gap_ind[j]=True for s in self.strong: gap_ind[s]=False # for technical reasons stronggaps above marks strong indices as gaps return gap_ind def slice(self,i): """return ith slice: gaps come from slicing as well as from strong indices""" blocks=[] shifts=[] gap_ind=self.gap_indicator(i) running_block=False for j in range(self.n): if not gap_ind[j] and not running_block: # start new block running_block=True blockstart=j shifts.append(blockstart) if gap_ind[j] and running_block: # block has stopped running_block=False blocks.append([self.x[t] for t in range(blockstart,j)]) if running_block: # if last block not yet processed blocks.append([self.x[t] for t in range(blockstart,self.n)]) return blocks,shifts def leftsum(self): ls=[0]*self.n for i in range(1,self.d): ls[i]=ls[i-1]+self.x[i-1] for i in range(self.d,self.n): ls[i]=ls[i-1]-self.x[i-self.d]+self.x[i-1] return ls def rightsum(self): # merge with leftsum rs=[0]*self.n last=self.n-1 for i in range(1,self.d): rs[last-i]=rs[last-i+1]+self.x[last-i+1] for i in range(self.d,self.n): rs[last-i]=rs[last-i+1]-self.x[last-i+self.d]+self.x[last-i+1] return rs def strong_indices(self): ls = self.leftsum() rs = self.rightsum() dom_list=[(i,self.x[i]-ls[i]-rs[i]) for i in range(self.n)] return [i for (i,si) in dom_list if si>0] ############ check correctness ########################### def is_separated(p,d,solution): """checks whether solution is p-spikes d-separated""" solution.sort() for i in range(len(solution)-p): if solution[i+p]-solution[i]k: print "Too many elements: solsize/k:{}/{}".format(len(solution),k) return False # at most k indices chosen? return is_separated(p,d,solution) def correct_headval(x,solution,headval): """does the head value correspond to the actual solution?""" return abs(sum([x[s] for s in solution])-headval)<0.00001 def test_solver(p,solver): params=[(200,d,k) for d in range(2,20) for k in range(5,20)] test_insts=[(randinst(n),n,d,k) for (n,d,k) in params] test_insts.extend([(purepoissoninst(n,1./d),n,d,k) for (n,d,k) in params]) test_insts.extend([(purepoissoninst(n,0.5/d),n,d,k) for (n,d,k) in params]) for (x,n,d,k) in test_insts: A=solver(x,d,k) if not is_solution(p,d,k,A.solution()): print "PANIC PANIC PANIC: not a solution!" return A if not correct_headval(x,A.solution(),A.headval()): print "PANIC PANIC PANIC: not correct solution value!" print "x(sol)/headval: {}/{}".format(sum(x[i] for i in A.solution()),A.headval()) return print " -> test passed" def headwrapper(a): f=lambda x,d,k:HeadSolver(x,d,k,a) f.name="HeadSolver with lambda="+str(a) f.p=1 return f def tailwrapper(a): f=lambda x,d,k:TailSolver(x,d,k,a) f.name="TailSolver with lambda="+str(a) f.p=1 return f def dpwrapper(): f=lambda x,d,k:DPSolver(x,d,k) f.name="DynProg" f.p=1 return f def twodpwrapper(): f=lambda x,d,k:TwoSpikesDP(x,d,k) f.name="Two-Spikes-DynProg" f.p=2 return f def twoheadwrapper(a): f=lambda x,d,k:TwoSpikesHeadSolver(x,d,k,a) f.name="TwoSpikesHeadSolver with lambda="+str(a) f.p=2 return f def solverlist(A=[1,2,3]): l=[dpwrapper(),twodpwrapper()] l.extend([tailwrapper(a) for a in A]) l.extend([headwrapper(a) for a in A]) l.extend([twoheadwrapper(a) for a in A]) return l def test_all_solvers(): for solver in solverlist(): print "testing whether {} returns feasible solution".format(solver.name) test_solver(solver.p,solver) def test_slicing(): """test whether [0...n-1] is sliced in such a way that each element appears a times""" n=400 x=range(n) for a in range(1,10): A=HeadSolver(x,n/10,n/10,a) slices=[A.slice(i) for i in range(a+1)] union=[item for (s,shifts) in slices for b in s for item in b] for j in range(n): if union.count(j)!=a: print "PANIC PANIC PANIC: slicing not correct!" return print "slicing test passed" def check(solver,x,d,k,correct_val): A=solver(x,d,k) if A.headval()==correct_val: print solver.name+": passed" else: print solver.name+": not passed!" print "found: {} / correct: {}".format(A.headval(),correct_val) def test_correct_sol(): print "testing for correct solution value" ones=[1 for i in range(100)] d=10 check(dpwrapper(),ones,d,10,10) check(headwrapper(1),ones,d,5,5) check(headwrapper(2),ones,d,5,5) check(headwrapper(2),ones,d,6,6) check(tailwrapper(1),ones,d,5,5) check(tailwrapper(2),ones,d,5,5) check(tailwrapper(2),ones,d,6,6) check(twodpwrapper(),ones,d,20,20) check(twoheadwrapper(1),ones,d,10,10) check(twoheadwrapper(2),ones,d,13,13) def test(): """run all test code""" test_slicing() test_all_solvers() test_correct_sol()