Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions synapse/lib/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import inspect
import logging
import weakref
import tempfile
import contextlib
import collections

Expand Down Expand Up @@ -59,6 +60,28 @@ def _fini_atexit(): # pragma: no cover

atexit.register(_fini_atexit)

def _ioWorkProc(todo, sockpath):

async def workloop():

import synapse.daemon as s_daemon

async with await s_daemon.Daemon.anit() as dmon:

func, args, kwargs = todo

item = await func(*args, **kwargs)

dmon.share('dmon', dmon)
dmon.share('item', item)

# bind last so we're ready to go...
await dmon.listen(f'unix://{sockpath}')
await item.waitfini()

asyncio.run(workloop())
sys.exit(0)

class Base:
'''
Base class for Synapse objects.
Expand Down Expand Up @@ -113,6 +136,60 @@ async def anit(cls, *args, **kwargs):

return self

@classmethod
async def spawn(cls, *args, **kwargs):
return await cls._spawn(args, kwargs)

@classmethod
def spawner(cls, base=None, sockpath=None):
async def _spawn(*args, **kwargs):
return await cls._spawn(args, kwargs, base=base, sockpath=sockpath)
return _spawn

@classmethod
async def _spawn(cls, args, kwargs, base=None, sockpath=None):

# avoid circular imports... *shrug*
import synapse.common as s_common
import synapse.telepath as s_telepath
import synapse.lib.link as s_link

todo = (cls.anit, args, kwargs)

iden = s_common.guid()

if sockpath is None:
tmpdir = tempfile.gettempdir()
sockpath = s_common.genpath(tmpdir, iden)

if base is None:
base = await Base.anit()

base.schedCoro(s_coro.spawn((_ioWorkProc, (todo, sockpath), {})))

await s_link.unixwait(sockpath)

proxy = await s_telepath.openurl(f'unix://{sockpath}:item')

async def fini():

try:
async with await s_telepath.openurl(f'unix://{sockpath}:item') as finiproxy:
await finiproxy.taskv2(('fini', (), {}))
except Exception:
# This can fail if the subprocess was terminated from outside...
pass

if not base.isfini:
logger.error(f'IO Worker Socket Closed: {sockpath}')

# FIXME should we bind the proxy to fini the base (cell?) since it's borked?
await base.fini()

proxy.onfini(fini)

return proxy

async def __anit__(self):

self.loop = asyncio.get_running_loop()
Expand Down
56 changes: 35 additions & 21 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,8 @@ async def __anit__(self, dirn, conf=None, readonly=False, parent=None):
s_telepath.Aware.__init__(self)

self.dirn = s_common.gendir(dirn)
self.sockdirn = s_common.gendir(dirn, 'sockets')

self.runid = s_common.guid()

self.auth = None
Expand Down Expand Up @@ -1519,14 +1521,16 @@ async def _storCellAuthMigration(self):
logger.warning(f'...Cell ({self.getCellType()}) auth migration complete!')

async def _drivePermMigration(self):
for lkey, lval in self.slab.scanByPref(s_drive.LKEY_INFO, db=self.drive.dbname):
info = s_msgpack.un(lval)
perm = info.pop('perm', None)
if perm is not None:
perm.setdefault('users', {})
perm.setdefault('roles', {})
info['permissions'] = perm
self.slab.put(lkey, s_msgpack.en(info), db=self.drive.dbname)
pass
# we might just need to hard-code dbname here and let it ride...
# for lkey, lval in self.slab.scanByPref(s_drive.LKEY_INFO, db=self.drive.dbname):
# info = s_msgpack.un(lval)
# perm = info.pop('perm', None)
# if perm is not None:
# perm.setdefault('users', {})
# perm.setdefault('roles', {})
# info['permissions'] = perm
# self.slab.put(lkey, s_msgpack.en(info), db=self.drive.dbname)

def getPermDef(self, perm):
perm = tuple(perm)
Expand Down Expand Up @@ -1681,6 +1685,8 @@ def _delTmpFiles(self):
shutil.rmtree(path, ignore_errors=True)
continue

# FIXME - recursively remove sockets dir here?

async def _execCellUpdates(self):
# implement to apply updates to a fully initialized active cell
# ( and do so using _bumpCellVers )
Expand Down Expand Up @@ -1892,7 +1898,13 @@ async def initServiceEarly(self):
pass

async def initCellStorage(self):
self.drive = await s_drive.Drive.anit(self.slab, 'celldrive')
path = s_common.gendir(self.dirn, 'slabs', 'drive.lmdb')
sockpath = s_common.genpath(self.sockdirn, 'drive')

spawner = s_drive.FileDrive.spawner(base=self, sockpath=sockpath)

self.drive = await spawner(path)

await self._bumpCellVers('drive:storage', (
(1, self._drivePermMigration),
), nexs=False)
Expand All @@ -1915,7 +1927,7 @@ async def _addDriveItem(self, info, path=None, reldir=s_drive.rootdir):

# replay safety...
iden = info.get('iden')
if self.drive.hasItemInfo(iden): # pragma: no cover
if await self.drive.hasItemInfo(iden): # pragma: no cover
return await self.drive.getItemPath(iden)

# TODO: Remove this in synapse-3xx
Expand All @@ -1928,10 +1940,10 @@ async def _addDriveItem(self, info, path=None, reldir=s_drive.rootdir):
return await self.drive.addItemInfo(info, path=path, reldir=reldir)

async def getDriveInfo(self, iden, typename=None):
return self.drive.getItemInfo(iden, typename=typename)
return await self.drive.getItemInfo(iden, typename=typename)

def reqDriveInfo(self, iden, typename=None):
return self.drive.reqItemInfo(iden, typename=typename)
async def reqDriveInfo(self, iden, typename=None):
return await self.drive.reqItemInfo(iden, typename=typename)

async def getDrivePath(self, path, reldir=s_drive.rootdir):
'''
Expand All @@ -1954,14 +1966,16 @@ async def addDrivePath(self, path, perm=None, reldir=s_drive.rootdir):
'''
tick = s_common.now()
user = self.auth.rootuser.iden
path = self.drive.getPathNorm(path)
path = await self.drive.getPathNorm(path)

if perm is None:
perm = {'users': {}, 'roles': {}}

for name in path:

info = self.drive.getStepInfo(reldir, name)
info = await self.drive.getStepInfo(reldir, name)

# we could skip this now ;)
await asyncio.sleep(0)

if info is not None:
Expand All @@ -1985,20 +1999,20 @@ async def getDriveData(self, iden, vers=None):
Return the data associated with the drive item by iden.
If vers is specified, return that specific version.
'''
return self.drive.getItemData(iden, vers=vers)
return await self.drive.getItemData(iden, vers=vers)

async def getDriveDataVersions(self, iden):
async for item in self.drive.getItemDataVersions(iden):
yield item

@s_nexus.Pusher.onPushAuto('drive:del')
async def delDriveInfo(self, iden):
if self.drive.getItemInfo(iden) is not None:
if await self.drive.getItemInfo(iden) is not None:
await self.drive.delItemInfo(iden)

@s_nexus.Pusher.onPushAuto('drive:set:perm')
async def setDriveInfoPerm(self, iden, perm):
return self.drive.setItemPerm(iden, perm)
return await self.drive.setItemPerm(iden, perm)

@s_nexus.Pusher.onPushAuto('drive:data:path:set')
async def setDriveItemProp(self, iden, vers, path, valu):
Expand Down Expand Up @@ -2047,7 +2061,7 @@ async def delDriveItemProp(self, iden, vers, path):
@s_nexus.Pusher.onPushAuto('drive:set:path')
async def setDriveInfoPath(self, iden, path):

path = self.drive.getPathNorm(path)
path = await self.drive.getPathNorm(path)
pathinfo = await self.drive.getItemPath(iden)
if path == [p.get('name') for p in pathinfo]:
return pathinfo
Expand All @@ -2060,13 +2074,13 @@ async def setDriveData(self, iden, versinfo, data):

async def delDriveData(self, iden, vers=None):
if vers is None:
info = self.drive.reqItemInfo(iden)
info = await self.drive.reqItemInfo(iden)
vers = info.get('version')
return await self._push('drive:data:del', iden, vers)

@s_nexus.Pusher.onPush('drive:data:del')
async def _delDriveData(self, iden, vers):
return self.drive.delItemData(iden, vers)
return await self.drive.delItemData(iden, vers)

async def getDriveKids(self, iden):
async for info in self.drive.getItemKids(iden):
Expand Down
2 changes: 2 additions & 0 deletions synapse/lib/coro.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ def execspawn():
coro = executor(execspawn)

retn = await s_common.wait_for(coro, timeout=timeout)
proc.terminate()

if isinstance(retn, Exception):
raise retn

Expand Down
Loading