You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
"""Compute the Huffman encoding of a provided text as input.Outputs the encoding for each symbol."""importsysimportheapqfromcollectionsimportnamedtuplefromcollectionsimportCounterNode=namedtuple("Node", "f s ascii children")
defhuffman(freq):
pq= [Node(f, s, ord(s), None) for (s, f) infreq.items()]
heapq.heapify(pq)
whilelen(pq) >1:
v1=heapq.heappop(pq)
v2=heapq.heappop(pq)
v=Node(v1.f+v2.f, "", -1, children=(v1, v2))
heapq.heappush(pq, v)
returnheapq.heappop(pq)
defdfs(root, prefix=""):
ifnotroot.children:
yield (root, prefix)
returnyieldfromdfs(root.children[0], prefix+"0")
yieldfromdfs(root.children[1], prefix+"1")
defget_codes(text):
freq=Counter(text)
tree=huffman(freq)
returnsorted(dfs(tree), key=lambdae: len(e[1]))
if__name__=="__main__":
text=sys.stdin.read().strip()
codes=get_codes(text)
fornode, codeincodes:
symbol=node.sifsymbol=="\n":
symbol="\\n"ifsymbol=="\t":
symbol="\\t"ifnode.ascii==13:
symbol="␍"ifnode.ascii==65279:
symbol="‗"lhs=f"{symbol.ljust(2)} ({str(node.ascii).rjust(4)})"print(lhs.ljust(10), "\t", len(code), "\t", code)
Stable Matching
"""The Gale–Shapley algorithm for Stable Matching. The algorithm runsin linear time in the total size of preferences, or, $O(n^2)$ where $n$is the number of "hospitals"."""defstable_matching(hospital_preferences, student_preferences):
students= [sforsinstudent_preferences]
hospitals= [hforhinhospital_preferences]
proposals= {h: 0forhinhospitals}
unmatched_hospitals= [hforhinhospitals]
student= {h: Noneforhinhospitals}
hospital= {s: Noneforsinstudents}
inrank= {s: {} forsinstudents} # maps s to each hospital's s-rankingforsinstudents:
foridx, hinenumerate(student_preferences[s]):
inrank[s][h] =idxwhileunmatched_hospitals:
h=unmatched_hospitals.pop()
nxt=proposals[h] # we could pop here insteads=hospital_preferences[h][nxt]
proposals[h] +=1# h proposes to its best student not yet proposed toifnothospital[s]:
# s is availablehospital[s] =hstudent[h] =selse:
sh=hospital[s]
rank_sh=inrank[s][sh]
rank_h=inrank[s][h]
ifrank_h<rank_sh:
# s dumps sh for hhospital[s] =hstudent[sh] =Nonestudent[h] =sunmatched_hospitals.append(sh)
else:
# s rejectsunmatched_hospitals.append(h)
returnstudentdef_generate_instance(n):
fromrandomimportsampleasshufflehospitals= [f"h{i}"foriinrange(n)]
students= [f"s{i}"foriinrange(n)]
hospital_preferences= {h: students[:n] forhinhospitals[:n]}
student_preferences= {s: hospitals[:n] forsinstudents[:n]}
forhinhospitals[:n]:
hospital_preferences[h] =shuffle(hospital_preferences[h], n)
forsinstudents[:n]:
student_preferences[s] =shuffle(student_preferences[s], n)
returnhospital_preferences, student_preferencesif__name__=="__main__":
hospital_preferences, student_preferences=_generate_instance(20)
M=stable_matching(hospital_preferences, student_preferences)
forhinM:
print(f"Hospital {h} + Student {M[h]}")
02-graphs
Bipartite Matching
"""Compute a maximum matching in an unweighted bipartite graph using DFS tofind augmenting paths. Implements the classic alternating-path approach forbipartite matching."""defmax_bipartite_matching(G, A, B):
matchA= {u: NoneforuinA}
matchB= {v: NoneforvinB}
defdfs(u, seen):
forvinG[u]:
ifvinseen:
continueseen.add(v)
ifmatchB[v] isNoneordfs(matchB[v], seen):
matchA[u] =vmatchB[v] =ureturnTruereturnFalseforuinA:
dfs(u, set())
returnmatchA, matchB
Dfs Tree
"""Run DFS and get a DFS tree"""fromcollectionsimportdefaultdictasDfromcollectionsimportnamedtupleasTDiGraph=T("DiGraph", "V E")
defoutneighbors(G, v):
# Yes we use edge list, the worst oneforx, yinG.E:
ifv==x:
yieldypre=D(int)
post= {}
time=0tree_edges=set()
defdfs(G, node):
globaltimetime+=1pre[node] =timeforvinoutneighbors(G, node):
ifnotpre[v]:
tree_edges.add((node, v))
dfs(G, v)
time+=1post[node] =timedefdfs_tree(G, pre, post, tree_edges):
edge_type= {}
foredgeinG.E:
u, v=edgeifedgeintree_edges:
edge_type[edge] ="tree"elifpre[u] <pre[v] <post[v] <post[u]:
edge_type[edge] ="forward"elifpre[v] <pre[u] <post[u] <post[v]:
edge_type[edge] ="back"elifpre[v] <post[v] <pre[u] <post[u]:
edge_type[edge] ="cross"returnedge_typedeftopological_sort(G, post):
toposort= [-1] * (2*len(G.V) +1)
forvinG.V:
toposort[post[v]] =vreturn [xforxinreversed(toposort) ifx>=0]
"""Kahn's algorithm produces a topological order of a DAG by repeatedlyremoving vertices with zero in-degree and updating their neighbors' in-degreesuntil none remain."""fromcollectionsimportdequedefkahns(G):
indegree= {u: 0foruinG.nodes}
foruinG.nodes:
forvinG[u]:
indegree[v] +=1queue=deque([uforuinG.nodesifindegree[u] ==0])
count=0whilequeue:
u=queue.popleft()
yielducount+=1forvinG[u]:
indegree[v] -=1ifindegree[v] ==0:
queue.append(v)
ifcount!=len(G.nodes):
raiseValueError("Graph is not a DAG")
Kosarajus Algorithm
"""A strongly connected component (SCC) is a maximal set of vertices whereevery vertex can reach every other by directed paths. Kosaraju's algorithmfinds all SCCs by doing a DFS to record finish times, reversing all edges, thenrunning DFS again in reverse finish order."""defkosaraju(G):
vis, order=set(), []
defdfs(u, graph, collect):
vis.add(u)
forvingraph[u]:
ifvnotinvis:
dfs(v, graph, collect)
collect.append(u)
foruinG:
ifunotinvis:
dfs(u, G, order)
R= {u: [] foruinG}
foruinG:
forvinG[u]:
R[v].append(u)
vis.clear()
whileorder:
u=order.pop()
ifunotinvis:
comp= []
dfs(u, R, comp)
yieldcomp
Rrt
"""Grow a rapidly-exploring random tree (RRT) toward a target while avoidingobstacles. Expands the tree incrementally by sampling feasible edges until thegoal region is reached or iterations are exhausted."""defrrt(start, target, obstacles, n=1, tree=None):
iftreeisNone:
tree=Tree(start, set([start]), set()) # rapidly exploring random tree (RRT)foriinrange(n):
edge=sample_tree_node(tree, obstacles)
ifedge:
q, v=edgetree.V.add(q)
tree.E.add((q, v))
ifdist(q, target) <CLOSE:
returntree, Truereturntree, False
Topological Sort With Dfs
"""Use postorder DFS to get a reversed topological ordering."""defdfs_toposort(G):
visited=set()
order= []
defdfs(u):
ifunotinvisited:
forvinG[u]:
dfs(v)
visited.add(u)
order.append(u)
foruinG.nodes:
ifunotinvisited:
dfs(u)
yieldfromreversed(order)
"""Maximum consecutive sum in an array with possibly negative values."""defconsecutive_sum(array, idx=0, con_sum=0, global_max=0):
ifidx>=len(array):
returnglobal_maxcopt=max(array[idx], array[idx] +con_sum)
gopt=max(global_max, copt)
returnconsecutive_sum(array, idx+1, copt, gopt)
print(consecutive_sum([4, 0, 3, -8, 1, 4, -2, -10]))
Longest Nonnegative
"""Find the length of the longest contiguous subarray with a non-negative sum.Uses prefix sums and two-pointer scanning over forward minima and reversemaxima."""deflongest_non_negative(data):
neginf=sum(abs(e) foreindata) *-1000# -inftydata= [neginf] +data+ [neginf]
presum= [0]
foridx, valinenumerate(data):
presum.append(val+presum[idx])
max_rev= [0for_inpresum]
max_rev[-1] =presum[-1]
foriinrange(len(presum) -2, -1, -1):
max_rev[i] =max(presum[i], max_rev[i+1])
min_fwd= [0for_inpresum]
foriinrange(1, len(presum)):
min_fwd[i] =min(presum[i], min_fwd[i-1])
lo=0hi=0mx=0whileTrue:
lv=min_fwd[lo]
rv=max_rev[hi]
mx=max(mx, (hi-lo-1))
ifrv-lv>=0:
hi+=1ifhi>=len(max_rev):
breakelse:
lo+=1iflo>=len(min_fwd):
breakreturnmx
Monotonic Queue
"""Monotonic queue for $O(1)$-amortized sliding-window min/max queries.Maintains a deque of candidate indices in monotone order as the window moves.Use it to compute all window minima/maxima in overall $O(n)$ time."""fromcollectionsimportdequedefsliding_window_min(arr, k):
"""Return list of minima of each length-k window of arr."""ifk<=0:
raiseValueError("k must be >= 1")
n=len(arr)
ifk>n:
return []
q=deque() # stores indices; arr[q[0]] is current minout= []
fori, xinenumerate(arr):
# drop indices that left the windowwhileqandq[0] <=i-k:
q.popleft()
# maintain increasing values in dequewhileqandarr[q[-1]] >=x:
q.pop()
q.append(i)
# window is formedifi>=k-1:
out.append(arr[q[0]])
returnoutdefsliding_window_max(arr, k):
"""Return list of maxima of each length-k window of arr."""ifk<=0:
raiseValueError("k must be >= 1")
n=len(arr)
ifk>n:
return []
q=deque() # stores indices; arr[q[0]] is current maxout= []
fori, xinenumerate(arr):
whileqandq[0] <=i-k:
q.popleft()
# maintain decreasing values in dequewhileqandarr[q[-1]] <=x:
q.pop()
q.append(i)
ifi>=k-1:
out.append(arr[q[0]])
returnoutif__name__=="__main__":
arr= [1, 3, -1, -3, 5, 3, 6, 7]
k=3mins=sliding_window_min(arr, k)
maxs=sliding_window_max(arr, k)
print("arr :", arr)
print("k :", k)
print("mins:", mins) # [-1, -3, -3, -3, 3, 3]print("maxs:", maxs) # [3, 3, 5, 5, 6, 7]
Prefix Sum
"""Compute the prefix sum of a given list as input."""fromsysimportstdinasinpa= [int(x) forxininp.readline().split()]
b= []
foridx, valinenumerate(a):
b.append(valifidx==0elseval+b[idx-1])
print(b)
Sliding Window
"""Sliding window to find best $k$ sized window in linear time."""data= [3.5, 7.5, 8.0, 5.7, 3.1, 4.2, 7.2, 0.1, 3.4, 1.2, -4]
k=3acc=sum(data[:k])
best=accwinstart=0forwinstartinrange(1, len(data) -k+1):
acc+=-data[winstart-1] +data[winstart+k-1]
ifacc>best:
best=accprint(best)
04-dynamic-programming
Bellman Ford
"""Single-source shortest paths (SSSP) with negative edges can be solved byBellman–Ford, which uses dynamic programming over path lengths to relax edgesup to $n-1$ times. If a further relaxation still decreases a distance, itindicates a negative-weight cycle reachable from the source."""defbellman_ford(n, G, s):
"""Return (dist, True) if successful, and (dist, False) if we have detected a negative cycle. """dist= [float("inf")] *ndist[s] =0for_inrange(n-1):
foru, v, winG.edges:
dist[v] =min(dist[v], dist[u] +w)
foru, v, winG.edges:
ifdist[u] +w<dist[v]:
dist, Falsereturndist, True
"""SOS DP / Fast Zeta Transform over subsets (submasks).This module provides the classic *subset zeta transform* (a.k.a. SOS DP step).Given an array f indexed by bitmasks in {0..2^n-1}, the subset zeta transformproduces f' where: f'[mask] = sum_{sub ⊆ mask} f[sub].This is useful when you want, for every mask, the sum/aggregation over all itssubmasks, but you want all answers at once in O(n·2^n) rather than doing anO(3^n) total amount of submask iteration."""defzeta_submasks(f, n):
N=1<<nforiinrange(n):
bit=1<<iformaskinrange(N):
ifmask&bit:
f[mask] +=f[mask^bit]
returnfif__name__=="__main__":
n=3w= [0] * (1<<n)
w[0b001] =5# {0}w[0b101] =2# {0,2}w[0b110] =7# {1,2}g=w[:]
zeta_submasks(g, n)
print("g[0b101] =", g[0b101])
"""APSP (All-Pairs Shortest Paths) is the problem of finding the shortest pathdistances between every pair of vertices in a weighted graph.Floyd–Warshall is a classic dynamic programming algorithm for APSP.By updating `dist[i][j] = min(dist[i][j], dist[i][k] + dist[k][j])` for eachintermediate vertex `k`, it computes shortest paths for all pairs in $O(n^3)$time."""deffloyd_warshall(dist):
n=len(dist)
forkinrange(n):
foriinrange(n):
forjinrange(n):
ifdist[i][k] +dist[k][j] <dist[i][j]:
dist[i][j] =dist[i][k] +dist[k][j]
returndist
"""Given a set of $n$ $(x, y)$ points, compute the least squares for the dataset. In addition, create a data structure that can answer query`least_squares(i, j)` of all points in `data[i:j]`. Using dynamic programmingwe can compute the entire data structure in time $O(n^2)$."""fromcollectionsimportnamedtupleRegressionState=namedtuple("RegressionState", "a b n sum_x sum_y sum_xy sum_x2 a_nom denom b_nom")
defregression(x, y, state=None):
ifstateisNoneorstate.n==0:
n=1sum_x, sum_y=x, ysum_xy, sum_x2=x*y, x*xelse:
n=state.n+1sum_x=state.sum_x+xsum_y=state.sum_y+ysum_xy=state.sum_xy+x*ysum_x2=state.sum_x2+x*xa_nom=n*sum_xy-sum_x*sum_ydenom=n*sum_x2-sum_x*sum_xb_nom=sum_y*sum_x2-sum_x*sum_xyifdenom==0:
a=0.0b=sum_y/nelse:
a=a_nom/denomb=b_nom/denomreturnRegressionState(a, b, n, sum_x, sum_y, sum_xy, sum_x2, a_nom, denom, b_nom)
defbuild_dp(points):
N=len(points)
DP= {}
foriinrange(N):
DP[(i, i)] =regression(*points[i])
foriinrange(N):
forjinrange(i+1, N):
DP[(i, j)] =regression(*points[j], DP[(i, j-1)])
returnDP# RUNNING THE CODEimportrandomdefgenerate_points(N):
points= []
forxinrange(N):
y=round(2+x+ (random.randint(-200, 200) *0.12), 3)
points.append((1+x, y))
returnpointsdefaxb(state, r=5):
"""Print s.a and s.b as "a·x+b"."""a=round(state.a, r)
b=round(state.b, r)
ifb>=0:
returnf"{a}·x + {b} ({state.n} points)"returnf"{a}·x - {-b} ({state.n} points)"RegressionState.__str__=axbif__name__=="__main__":
importsysiflen(sys.argv) !=2:
exit("Usage: ./regression.py <N>")
N=int(sys.argv[1])
random.seed(1000*N%2**20-1)
points=generate_points(N)
DP=build_dp(points)
line1=DP[(0, N-1)]
print(line1)
"""Basic implementation for SQRT decomposition. The SQRT data structure cananswer range queries such as sum, max, min in time $O(\\sqrt n)$ and can do updateelement in $O(\\sqrt n)$ as well.In theory, this data structure can take any [associativeoperation](https://en.wikipedia.org/wiki/Associative_property) (e.g., gcd,lcm), and the elements can even be higher order structures such as matriceswith the operation being matrix multiplication. Indeed, you can take elementsover $\\text{GL}_n(\\mathbb{C})$, and answer such queries."""importmathclassSQRT:
"""The SQRT data structure. The SQRT data structure can answer range queries such as sum, max, min in time $O(\\sqrt n)$ and can do update element in $O(\\sqrt n)$ as well. The update element functionality makes this a data structure that outperforms prefix sum in some cases. """def_update(self, block):
ifblock<0:
returnself._block_data[block] = {
"sum": sum(self._blocks[block]),
"min": min(self._blocks[block]),
"max": max(self._blocks[block]),
}
def_initialize(self, data):
block=-1foridx, einenumerate(data):
if (idx%self._block_size) ==0:
self._update(block)
block+=1self._blocks.append([])
self._blocks[block].append(e)
ifblocknotinself._block_data:
# the trailing blockself._update(block)
def__init__(self, data):
"""Create a SQRT decomposition of iterable data. Complexity: O(n). """d= [eforeindata]
ifnotd:
raiseValueError("SQRT undefined for empty set")
self._glob_min=min(d) -1self._glob_max=max(d) +1self._len=len(d)
self._blocks= []
self._block_size=int(math.sqrt(len(d)))
self._block_data= {}
self._initialize(d)
def__len__(self):
returnself._lendef_pos(self, idx):
returnidx//self._block_size, idx%self._block_sizedef__getitem__(self, coord):
"""Get a statistics dictionary for either an index or a slice. SQRT(lst)[l:r] returns a dictionary with keys "sum", "min", and "max", mapping to their respective values for the given range. Complexity: $O(\\sqrt n)$. """ifisinstance(coord, int):
b, l=self._pos(coord)
returnself._blocks[b][l]
ifisinstance(coord, slice):
ifcoord.stepisnotNone:
raiseValueError("SQRT cannot stride")
coord= (coord.start, coord.stop)
ifnotisinstance(coord, tuple):
raiseValueError(f"cannot unpack {coord} in sqrt[(l, r)]")
iflen(coord) !=2:
raiseValueError(f"size mismatch for {coord} in sqrt[(l, r)]")
l, r=coordl_block, l_loc=self._pos(l)
r_block, r_loc=self._pos(r)
ifl_block==r_block:
# special case when l and r within same blockview=self._blocks[l_block][l_loc:r_loc]
return {
"sum": sum(view),
"min": min(view),
"max": max(view),
}
stats= {
"sum": 0,
"min": self._glob_max,
"max": self._glob_min,
}
forl_idxinrange(l_loc, self._block_size):
e=self._blocks[l_block][l_idx]
stats["sum"] +=estats["min"] =min(stats["min"], e)
stats["max"] =max(stats["max"], e)
forbinrange(l_block+1, r_block):
stats["sum"] +=self._block_data[b]["sum"]
stats["min"] =min(stats["min"], self._block_data[b]["min"])
stats["max"] =max(stats["max"], self._block_data[b]["max"])
forr_idxinrange(r_loc):
e=self._blocks[r_block][r_idx]
stats["sum"] +=estats["min"] =min(stats["min"], e)
stats["max"] =max(stats["max"], e)
returnstatsdef__setitem__(self, idx, val):
"""Update index idx to be val. Complexity: $O(\\sqrt n)$. """self._glob_min=min(self._glob_min, val-1)
self._glob_max=max(self._glob_max, val+1)
block, loc=self._pos(idx)
self._blocks[block][loc] =valself._update(block)
def__str__(self):
"""Return a verbose string representation of the SQRT decomposition."""retval="SQRT(\n"foridx, eltinenumerate(self._blocks):
s_i=str(idx).ljust(3)
s_o=str(self._block_data[idx]).ljust(4*12+4)
s_d="["+", ".join(str(x).rjust(5) forxinelt) +"]"retval+=f" {s_i}{s_o}{s_d}\n"returnretval+")"def__repr__(self):
data= [str(self[i]) foriinrange(len(self))]
returnf"SQRT([{', '.join(data)}])"
06-divide-and-conquer
Closest Pair
"""A divide and conquer algorithm for computing a closest pair of points in aset of $n$ points in the plane in time $O(n \\log n)$. It outputs both theirdistance and the pair itself."""importitertoolsasITimportmathfromcollectionsimportnamedtupleasTPoint=T("Point", "x y")
Sol=T("Sol", "delta pair")
defbruteforce(points):
returnmin(Sol(math.hypot(a.x-b.x, a.y-b.y), (a, b)) for (a, b) inIT.combinations(points, 2))
defclosest_pair(X, Y):
iflen(X) <=5:
returnbruteforce(X)
pivot=X[len(X) //2]
Xl= [pforpinXifp<=pivot]
Yl= [pforpinYifp<=pivot]
Xr= [pforpinXifp>pivot]
Yr= [pforpinYifp>pivot]
OPT=min(closest_pair(Xl, Yl), closest_pair(Xr, Yr))
S= [pforpinYifabs(p.x-pivot.x) <OPT.delta]
iflen(S) >1:
OPT=min(OPT, min(bruteforce(IT.islice(S[i:], 6)) foriinrange(len(S) -1)))
returnOPTdefclosest(points):
X=sorted(points)
Y=sorted(points, key=lambdap: (p.y, p.x))
returnclosest_pair(X, Y)
# DriverimportrandomN=100P= [Point(random.random() *1000, random.random() *1000) for_inrange(N)]
delta, (p1, p2) =closest(P)
print("delta:", round(delta, 3))
"""Knuth–Morris–Pratt (KMP) algorithm for pattern matching."""defmatch(text, pattern):
"""Yield indices i where text[i:len(P)] == P"""foriinrange(len(text)):
iftext[i : i+len(pattern)] ==pattern:
yieldidefbf(text, pattern):
"""Return the indices i where text[i:len(P)] == P"""iflen(text) <=len(pattern):
return [0] iftext==patternelse []
indices= []
foriinrange(len(text) -len(pattern) +1):
matches=Trueforjinrange(len(pattern)):
iftext[i+j] !=pattern[j]:
matches=Falsebreakifmatches:
indices.append(i)
returnindicesdeflongest_prefix_suffix(pattern):
"""Return lps table. `lps[i]` is where to start matching if first mismatch at i+1, or length of longest suffix that is also a prefix of the string from 0 to i. """n, k=0, len(pattern)
lps= [0] *kidx=1whileidx<k:
print(idx, n, pattern[idx], pattern[n])
ifpattern[idx] ==pattern[n]:
n+=1lps[idx] =nelifn!=0:
n=lps[n-1]
else:
lps[idx] =0idx+=1returnlpsdefkmp(text, pattern, lps=None):
"""Yield indices idx where text[idx:idx+N] == P"""N, M=len(text), len(pattern)
iflpsisNone:
lps=longest_prefix_suffix(pattern)
txt_idx=0pat_idx=0whiletxt_idx<N:
ifpattern[pat_idx] ==text[txt_idx]:
txt_idx+=1pat_idx+=1ifpat_idx==M:
yieldtxt_idx-pat_idxpat_idx=lps[pat_idx-1]
# mismatch after pat_idx matcheseliftxt_idx<Nandpattern[pat_idx] !=text[txt_idx]:
# Do not match lps[0..lps[pat_idx-1]] characters,# they will match anywayifpat_idx!=0:
pat_idx=lps[pat_idx-1]
else:
txt_idx+=1
"""Factorize a number into prime factors."""frommathimportsqrtdeffactoring(n):
whilen%2==0:
yield2n//=2foriinrange(3, int(sqrt(n)) +1, 2):
whilen%i==0:
yieldin//=iifn>1:
yieldnN=2310*510510print(f"factors({N}) =", *factoring(N))
Sieve
"""The Sieve of Eratosthenes — an ancient algorithm for finding all prime numbers up to any given limit."""defsieve(n):
ifn<2:
returnprime= [True] * (n+1)
prime[0] =Falseprime[1] =Falseforiinrange(2, n+1):
ifnotprime[i]:
continueyieldiforjinrange(i+i, n, i):
prime[j] =Falseprint(*sieve(101))