Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
933e1aa
WIP: step1: aha:network default
invisig0th Jun 27, 2024
cba0bfe
wip
invisig0th Jun 27, 2024
8697e4a
wip
invisig0th Jun 28, 2024
c9a9a9d
wip
invisig0th Jun 28, 2024
c15ba7c
wip
invisig0th Jun 28, 2024
9587d08
Merge branch 'master' into visi-aha-defnet
invisig0th Jun 28, 2024
9ac5fbf
wip
invisig0th Jun 29, 2024
0f6cdd6
Merge branch 'visi-aha-defnet' of ssh://github.com/vertexproject/syna…
invisig0th Jun 29, 2024
4b69909
wip
invisig0th Jul 1, 2024
f1d1cad
wip
invisig0th Jul 1, 2024
0a52e07
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 5, 2024
9c18de4
wip
invisig0th Jul 5, 2024
a1664e5
wip
invisig0th Jul 5, 2024
df24baa
wip
invisig0th Jul 5, 2024
b5a543e
wip
invisig0th Jul 5, 2024
d891c82
wip
invisig0th Jul 5, 2024
f31f1af
wip
invisig0th Jul 5, 2024
42697fa
wip
invisig0th Jul 5, 2024
7e2fb64
wip
invisig0th Jul 5, 2024
1d5e0df
wip
invisig0th Jul 5, 2024
ff884dc
wip
invisig0th Jul 5, 2024
562cea7
wip
invisig0th Jul 5, 2024
7e59310
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 5, 2024
0fc3257
wip
invisig0th Jul 5, 2024
27cfd9f
improve coverage
invisig0th Jul 5, 2024
b9bb3f0
wip
invisig0th Jul 6, 2024
92b498d
wip
invisig0th Jul 6, 2024
40c5e04
wip
invisig0th Jul 7, 2024
2c063ed
wip
invisig0th Jul 7, 2024
08d3534
wip
invisig0th Jul 7, 2024
9750935
wip
invisig0th Jul 8, 2024
4f13a21
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 8, 2024
2ee4285
duh
invisig0th Jul 8, 2024
1e715c9
wip
invisig0th Jul 8, 2024
b01d3bf
speed up tests
invisig0th Jul 8, 2024
f9489ec
WIP: AHA Leader Terms
invisig0th Jul 9, 2024
be760d2
Update docs/synapse/deploymentguide.rst
invisig0th Jul 9, 2024
f0f8a4f
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 9, 2024
36acd8d
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 9, 2024
a7c5e3d
wip
invisig0th Jul 9, 2024
90b7959
wip
invisig0th Jul 9, 2024
25af8b6
wip
invisig0th Jul 9, 2024
e4dea7b
make timeout message a bit prettier
invisig0th Jul 10, 2024
c7f1973
Update synapse/lib/aha.py
invisig0th Jul 10, 2024
f62ee42
Do container branch build
vEpiphyte Jul 10, 2024
03f7983
Merge branch 'visi-aha-defnet' into visi-aha-term
invisig0th Jul 11, 2024
21314af
wip
invisig0th Jul 11, 2024
5954fa4
Update docker test script for required value.
vEpiphyte Jul 12, 2024
6b0f3a9
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 12, 2024
c7145e3
wip
invisig0th Jul 12, 2024
aa53e1c
Merge branch 'visi-aha-defnet' into visi-aha-term
invisig0th Jul 12, 2024
4282ac8
wip
invisig0th Jul 12, 2024
de05d16
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 15, 2024
f2c4fae
wip
invisig0th Jul 15, 2024
0c55fd9
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 15, 2024
ef41b3e
wip
invisig0th Jul 15, 2024
3badfdc
wip
invisig0th Jul 15, 2024
4533440
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 15, 2024
cbc39e3
Fix fstrings
vEpiphyte Jul 15, 2024
020f7a1
Merge branch 'master' into visi-aha-defnet
vEpiphyte Jul 15, 2024
7fb5d58
Fix fstring for clone; add --only-url
vEpiphyte Jul 15, 2024
0211b3e
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 17, 2024
dc88406
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 17, 2024
e9e4294
Merge branch 'visi-aha-defnet' of ssh://github.com/vertexproject/syna…
invisig0th Jul 17, 2024
9337d83
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 22, 2024
efa7d02
wip
invisig0th Jul 23, 2024
03b4b71
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 24, 2024
fe2e583
wip
invisig0th Jul 24, 2024
2f06751
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 25, 2024
95774e6
Merge branch 'master' into visi-aha-defnet
vEpiphyte Jul 26, 2024
21ab327
Visi aha defnet epiphyte (#3823)
vEpiphyte Jul 26, 2024
dc286f4
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 29, 2024
0dd367b
Merge branch 'master' into visi-aha-defnet
vEpiphyte Jul 30, 2024
7a91deb
wip
invisig0th Jul 30, 2024
728da28
wip
invisig0th Jul 30, 2024
37983b2
wip
invisig0th Jul 30, 2024
8d07930
Remove logger statement about username hinting on aha lookup
vEpiphyte Jul 31, 2024
891fc22
Restore ahaname logging during promote/handoff API calls when it is a…
vEpiphyte Jul 31, 2024
a2d5d0b
Tweak flat network docs slightly
vEpiphyte Jul 31, 2024
252599d
changelog entries
invisig0th Jul 31, 2024
e379042
Merge branch 'visi-aha-defnet' of ssh://github.com/vertexproject/syna…
invisig0th Jul 31, 2024
36d8054
Merge branch 'master' into visi-aha-defnet
invisig0th Jul 31, 2024
26c5127
Update changes/6ebc22454e67c26ce57ea4533441c9fc.yaml
invisig0th Jul 31, 2024
55d8394
Update docs/synapse/deploymentguide.rst
invisig0th Jul 31, 2024
7da0c6e
Update docs/synapse/deploymentguide.rst
invisig0th Jul 31, 2024
e9af9a3
Update synapse/lib/cell.py
invisig0th Jul 31, 2024
502d23d
wip
invisig0th Aug 1, 2024
6368eb1
Merge branch 'master' into visi-aha-defnet
invisig0th Aug 1, 2024
37a7a85
Update docs/synapse/deploymentguide.rst
invisig0th Aug 2, 2024
650a891
Merge branch 'master' into visi-aha-defnet
invisig0th Aug 5, 2024
221c823
wip
invisig0th Aug 5, 2024
6cb8bf6
merge from branch
invisig0th Aug 6, 2024
d9795aa
merge from master
invisig0th Aug 6, 2024
052e063
wip
invisig0th Aug 6, 2024
6e4d89f
wip
invisig0th Aug 6, 2024
adbe5b0
merge from master
invisig0th Jan 31, 2025
d1e0fdb
wip
invisig0th Jan 31, 2025
cd2db86
wip
invisig0th Jan 31, 2025
f555ea4
wip
invisig0th Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions synapse/lib/aha.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
}
provSvcSchema = s_config.getJsValidator(_provSvcSchema)

STATE_LEAD = 0 # lead...
STATE_FOLLOW = 1 # follow...
STATE_INVALID = 2 # or get out of the way!

class AhaProvisionServiceV1(s_httpapi.Handler):

Expand Down Expand Up @@ -367,6 +370,15 @@ async def clearAhaUserEnrolls(self):
'''
return await self.cell.clearAhaUserEnrolls()

async def mayLeadTerm(self, iden, name, term, nexs):
network = self.cell.conf.get('aha:network')
# If we could register the service, we can lead the service...
await self._reqUserAllowed(('aha', 'service', 'add', network, name))
return await self.cell.mayLeadTerm(iden, name, term, nexs)

async def getLeadTerm(self, iden):
return await self.cell.getLeadTerm(iden)

@s_cell.adminapi()
async def clearAhaClones(self):
'''
Expand Down Expand Up @@ -579,6 +591,7 @@ async def _initCellBoot(self):
async def initServiceStorage(self):

self.features['callpeers'] = 1
self.features['leadterms'] = 1

dirn = s_common.gendir(self.dirn, 'slabs', 'jsonstor')

Expand All @@ -596,6 +609,8 @@ async def fini():
self.slab.initdb('aha:provs')
self.slab.initdb('aha:enrolls')

self.slab.initdb('aha:leadterms')

self.slab.initdb('aha:clones')
self.slab.initdb('aha:servers')

Expand Down Expand Up @@ -1407,6 +1422,94 @@ async def signUserCsr(self, csrtext, signas=None):

return self.certdir._certToByts(cert).decode()

async def getLeadTerm(self, iden):
'''
Get the current leader term for the specified service cluster.
'''
lkey = s_common.uhex(termdef.get('iden'))
byts = self.slab.get(b'\x00' + lkey, db='aha:leadterms')
if byts is not None:
return s_msgpack.un(byts)

@s_cell.from_leader
async def setLeadTerm(self, termdef):
'''
Set the current leader term for the specified service cluster.
'''
termdef['time'] = s_common.now()
s_schema.reqValidLeadTerm(termdef)
return await self._push('aha:lead:set', termdef)

@s_nexus.Pusher.onPush('aha:lead:set')
async def _setLeadTerm(self, termdef):
s_schema.reqValidLeadTerm(termdef)
lkey = s_common.uhex(termdef.get('iden'))
self.slab.put(b'\x00' + lkey, s_msgpack.en(termdef), db='aha:leadterms')

@s_cell.from_leader
async def nextLeadTerm(self, iden, name, nexs):
'''
Force a change in leadership for the given cluster iden.
'''
async with self.nexslock:

term = 0
leadterm = self.getLeadTerm(iden)
if leadterm is not None:
term = leadterm.get('term') + 1

newterm = {
'iden': iden,
'name': name,
'term': term,
'nexs': nexs,
}
return await self.setLeadTerm(leadterm)

@s_cell.from_leader
async def mayLeadTerm(self, iden, name, term, nexs):
'''
Determine if the caller is still the valid leader.
Returns: (<status>, <termdef>) tuple.

The <status> may be one of the following values:
STATE_LEAD: you are the leader
STATE_FOLLOW: you must become a follower
STATE_INVALID: you are divergent and must reprovision.
'''

# we hold the nexus lock so this may only be run on the leader
async with self.nexslock:

leadterm = await self.getLeadTerm(iden)
if leadterm is None:
leadterm = {
'iden': iden,
'name': ahaname,
'term': term,
'nexs': nexs,
}
await self.setLeadTerm(leadterm)
return (STATE_LEAD, leadterm)

# if we were the last known leader, we can jump right in
if leadterm.get('name') == ahaname:
return (STATE_LEAD, leadterm)

# has someone else taken the lead in our absense?
if leadterm.get('term') > term:

# if someone else was promoted and their nexs index
# was less than ours at the time, we are divergent :(
if leadterm.get('nexs') < nexs:
return (STATE_INVALID, leadterm)

# if they took over but had an equal nexs index we can
# become a follower...
return (STATE_FOLLOW, leadterm)

return (STATE_INVALID, leadterm)

async def getAhaUrls(self, user='root'):

# for backward compat...
Expand Down
148 changes: 121 additions & 27 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,7 @@ async def fini():

# for runtime cell configuration values
self.slab.initdb('cell:conf')
self.slab.initdb('cell:meta')

self._sslctx_cache = s_cache.FixedCache(self._makeCachedSslCtx, size=SSLCTX_CACHE_SIZE)

Expand Down Expand Up @@ -1371,6 +1372,17 @@ async def fini():
# phase 5 - service networking
await self.initServiceNetwork()

def getCellMeta(self, name, defv=None):
byts = self.slab.get(name.encode(), s_msgpack.en(valu), db='cell:meta')
if byts is not None:
return s_msgpack.un(byts)
return defv

def setCellMeta(self, name, valu):
# NOTE: these changes are NOT nexus enabled!
self.slab.put(name.encode(), s_msgpack.en(valu), db='cell:meta')
return valu

async def _storCellHiveMigration(self):
logger.warning(f'migrating Cell ({self.getCellType()}) info out of hive')

Expand Down Expand Up @@ -1738,31 +1750,95 @@ async def _runSysctlLoop(self):

await self.waitfini(self.SYSCTL_CHECK_FREQ)

async def _askAhaToLead(self, proxy):

while not self.isfini:

try:
proxy = self.ahaclient.proxy(timeout=4)
except Exception as e:
logger.error(f'Error getting AHA proxy to check leadership: {e}')
continue

if not proxy._hasTeleFeat('leadterms'):
logger.warning('AHA Server does not support tracking leadership terms. Please update!')
return

async with self.nexslock:

name = self.conf.get('aha:name')
nexs = self.getNexsIndx()
term = self.getCellMeta('aha:term', 0)

state, leadterm = await proxy.mayLeadTerm(self.iden, name, term, nexs)

realterm = leadterm.get('term')
if realterm > term or self.getCellMeta('aha:term') is None:
self.setCellMeta('aha:term', realterm)

if state == s_aha.STATE_LEAD:
await self.setMirror(None)
return

if state == s_aha.STATE_FOLLOW:
lead = leadterm.get('name')
user = self.conf.get('aha:user', 'root')
await self.setMirror(f'aha://{user}@{lead}')
return

if state == s_aha.STATE_INVALID:
await self._termStateInvalid()
return

async def _termStateInvalid(self):
# hook point for enterprise behavior
logger.error('Leadership schism detected!')
logger.error('See: FIXME DOCS URL')
# TODO: should we allow an ENV var based override?
# TODO: should we find the leader and re-provision?
await self.fini()

def getAhaRegistry(self):

urls = self.conf.get('aha:registry')
if isinstance(urls, str):
return (urls,)

if isinstance(urls, list):
urls = tuple(urls)

return urls

def setAhaRegistry(self, urls):
self.modCellConf({'aha:registry': urls})
self.ahaclient.setBootUrls(urls)

async def _initAhaRegistry(self):

ahaurls = self.conf.get('aha:registry')
ahaurls = self.getAhaRegistry()
if ahaurls is not None:

await s_telepath.addAhaUrl(ahaurls)
if self.ahaclient is not None:
await self.ahaclient.fini()

async def onlink(proxy):

ahauser = self.conf.get('aha:user', 'root')

oldurls = self.getAhaRegistry()
newurls = await proxy.getAhaUrls(user=ahauser)
oldurls = self.conf.get('aha:registry')
if isinstance(oldurls, str):
oldurls = (oldurls,)
elif isinstance(oldurls, list):
oldurls = tuple(oldurls)

if newurls and newurls != oldurls:
if oldurls[0].startswith('tcp://'):
s_common.deprecated('aha:registry: tcp:// client values.')
logger.warning('tcp:// based aha:registry options are deprecated and will be removed in Synapse v3.0.0')
return

self.modCellConf({'aha:registry': newurls})
self.ahaclient.setBootUrls(newurls)
self.setAhaRegistry(newurls)

if self.conf.get('mirror') is None:
await self._askAhaToLead()

self.ahaclient = await s_telepath.Client.anit(ahaurls, onlink=onlink)
self.onfini(self.ahaclient)
Expand Down Expand Up @@ -1941,13 +2017,32 @@ async def initServiceStorage(self):
pass

async def initNexusSubsystem(self):
if self.cellparent is None:
await self.nexsroot.recover()
await self.nexsroot.startup()
await self.setCellActive(self.conf.get('mirror') is None)

if self.minfree is not None:
self.schedCoro(self._runFreeSpaceLoop())
if self.cellparent is not None:
return

# If we are AHA enabled and think we're the leader,
# we must check before proceeding...
# TODO: or be readonly or a follower of None or something?
mirror = self.conf.get('mirror')
if self.ahaclient is not None:
self._ask
nexs = self.getNexsIndx()

await self.nexsroot.recover()
await self.nexsroot.startup()

# retrieve this again in case it changed
mirror = self.conf.get('mirror')
await self.setCellActive(mirror is None)

if self.minfree is not None:
self.schedCoro(self._runFreeSpaceLoop())

async def setMirror(self, url):
self.conf.update({'mirror': url})
await self.setCellActive(url is None)
await self.nexsroot.startup()

async def _bindDmonListen(self):

Expand Down Expand Up @@ -2174,14 +2269,19 @@ async def promote(self, graceful=False):
logger.warning(f'PROMOTION: Completed leadership handoff to {myurl}{_dispname}')
return

logger.debug(f'PROMOTION: Clearing mirror configuration{_dispname}.')
self.modCellConf({'mirror': None})
if self.ahaclient:

logger.debug(f'PROMOTION: Promoting the nexus root{_dispname}.')
await self.nexsroot.promote()
name = self.conf.get('aha:name')
proxy = await self.ahaclient.proxy(timeout=6)

logger.debug(f'PROMOTION: Setting the cell as active{_dispname}.')
await self.setCellActive(True)
logger.warning(f'PROMOTION: Updating AHA Leadership Term')
async with self.nexslock:
nexs = await self.getNexsIndx()
leadterm = await proxy.nextLeadTerm(self.iden, name, nexs)
self.setCellMeta('aha:term', leadterm.get('term'))

logger.debug(f'PROMOTION: Clearing mirror configuration{_dispname}.')
await self.setMirror(None)

logger.warning(f'PROMOTION: Finished leadership promotion{_dispname}.')

Expand Down Expand Up @@ -2226,14 +2326,8 @@ async def handoff(self, turl, timeout=30):
logger.debug(f'HANDOFF: Mirror has caught up to the current leader, performing promotion{_dispname}.')
await cell.promote()

logger.debug(f'HANDOFF: Setting the service as inactive{_dispname}.')
await self.setCellActive(False)

logger.debug(f'HANDOFF: Configuring service to sync from new leader{_dispname}.')
self.modCellConf({'mirror': turl})

logger.debug(f'HANDOFF: Restarting the nexus{_dispname}.')
await self.nexsroot.startup()
await self.setMirror(turl)

logger.debug(f'HANDOFF: Released nexus lock{_dispname}.')

Expand Down
13 changes: 13 additions & 0 deletions synapse/lib/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@
import synapse.lib.msgpack as s_msgpack


leadTermSchema = {
'type': 'object',
'properties': {
'iden': {'type': 'string', 'pattern': s_config.re_iden},
'name': {'type': 'string'},
'term': {'type': 'number'},
'nexs': {'type': 'number'},
'time': {'type': 'number'},
},
'required': ['iden', 'term', 'nexs', 'name', 'time'],
}
reqValidLeadTerm = s_config.getJsValidator(leadTermSchema)

easyPermSchema = {
'type': 'object',
'properties': {
Expand Down