Skip to content

Commit

Permalink
Optimized a lot of things
Browse files Browse the repository at this point in the history
  • Loading branch information
O-Jiangweidong committed Aug 8, 2024
1 parent 4e8eaaa commit 995aec8
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 50 deletions.
40 changes: 20 additions & 20 deletions apps/behemoth/api/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
Environment, Playback, Plan, Iteration, Execution, Command,
PlaybackExecution, MonthlyVersion
)
from common.utils import is_uuid
from common.utils import is_uuid, get_logger
from common.exceptions import JMSException
from common.utils.timezone import local_now_display
from orgs.mixins.api import OrgBulkModelViewSet
from orgs.utils import get_current_org_id


logger = get_logger(__file__)


class ExecutionMixin:
@staticmethod
def start_task(
Expand Down Expand Up @@ -103,24 +106,20 @@ class MonthlyVersionViewSet(OrgBulkModelViewSet):
'get_playbacks': ['behemoth.view_playbacks']
}

@action(methods=['GET', 'PATCH', 'DELETE'], detail=True, url_path='playbacks')
@action(methods=['PATCH', 'DELETE'], detail=True, url_path='playbacks')
def get_playbacks(self, request, *args, **kwargs):
obj = self.get_object()
if request.method == 'GET':
qs = obj.playbacks.all()
return self.get_paginated_response_from_queryset(qs)
else:
serializer = serializers.PlaybackMonthlyVersionSerializer(data=request.data)
if serializer.is_valid():
assets = serializer.validated_data.get('playbacks')
action_ = serializer.validated_data['action']
if action_ == 'remove':
obj.playbacks.remove(*tuple(assets))
else:
obj.playbacks.add(*tuple(assets))
return Response(status=http_status.HTTP_200_OK)
serializer = serializers.PlaybackMonthlyVersionSerializer(data=request.data)
if serializer.is_valid():
assets = serializer.validated_data.get('playbacks')
action_ = serializer.validated_data['action']
if action_ == 'remove':
obj.playbacks.remove(*tuple(assets))
else:
return Response(status=http_status.HTTP_400_BAD_REQUEST, data=serializer.errors)
obj.playbacks.add(*tuple(assets))
return Response(status=http_status.HTTP_200_OK)
else:
return Response(status=http_status.HTTP_400_BAD_REQUEST, data=serializer.errors)


class PlaybackExecutionViewSet(OrgBulkModelViewSet):
Expand All @@ -142,6 +141,7 @@ def get_queryset(self):
class PlaybackViewSet(OrgBulkModelViewSet):
model = Playback
search_fields = ['name']
filterset_fields = ['monthly_version']
serializer_classes = {
'default': serializers.PlaybackSerializer,
'insert_pause': serializers.InsertPauseSerializer,
Expand Down Expand Up @@ -316,8 +316,8 @@ def get_commands(self, request, *args, **kwargs):
commands[0].output = f.read()
else:
commands[0].output = os.path.basename(commands[0].output)
except Exception: # noqa
pass
except Exception as e: # noqa
logger.warning('Convert command error: %s', e)

serializer = self.get_serializer(commands, many=True)
return Response({'results': serializer.data, 'category': execution.category})
Expand Down Expand Up @@ -383,7 +383,7 @@ def upload_command_file(self, request, *args, **kwargs):
file = self._handle_zip_file(file, entry)

name, ext = os.path.splitext(file.name)
file_name = f'{name}-({local_now_display("%Y_%m_%d_%H_%M_%S")}){ext}'
file_name = f'{name}-{local_now_display("%Y_%m_%d_%H_%M_%S")}{ext}'
execution = self.get_object().create_execution(
with_auth=True, name=file_name, category=ExecutionCategory.file,
version=serializer.validated_data['version']
Expand All @@ -404,7 +404,7 @@ def start_sync_task(self, request, *args, **kwargs):
participants = getattr(settings, 'SYNC_PLAN_REQUIRED_PARTICIPANTS', 2)
wait_timeout = getattr(settings, 'SYNC_PLAN_WAIT_PARTICIPANT_IDLE', 3600)
if len(user_set) >= participants:
cache.set(PLAN_TASK_ACTIVE_KEY.format(obj.id), [], timeout=3600 * 24 * 7)
cache.set(PLAN_TASK_ACTIVE_KEY.format(obj.id), user_set, timeout=wait_timeout * 24 * 7)
return self.start_task(
obj.executions.all(), user_set, response_data={'users': [str(request.user)]}
)
Expand Down
13 changes: 11 additions & 2 deletions apps/behemoth/libs/go_script/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -311,7 +312,7 @@ func (s *LocalScriptHandler) DoCommand(command string) (string, error) {
port := fmt.Sprintf("-P%v", s.opts.Auth.Port)
database := fmt.Sprintf("-D%s", s.opts.Auth.DBName)
sqlPath := fmt.Sprintf("source %s", s.opts.CmdFile)
args = append(args, username, host, port, database, "-t", "-e", sqlPath)
args = append(args, username, host, port, database, "-t", "-vvv", "-e", sqlPath)
} else if s.opts.Script == "sqlplus" {
connectionStr = fmt.Sprintf(
"%s/\"%s\"@%s:%d/%s", s.opts.Auth.Username, s.opts.Auth.Password, s.opts.Auth.Address, s.opts.Auth.Port, s.opts.Auth.DBName,
Expand All @@ -330,7 +331,15 @@ func (s *LocalScriptHandler) DoCommand(command string) (string, error) {
cmd.Dir = s.cmdDir
}
output, err := cmd.CombinedOutput()
ret := strings.TrimSpace(string(output))
ret := string(output)
if s.opts.Script == "mysql" {
re := regexp.MustCompile(`(?s)--------------\n(.*?)\n--------------\n(.*)Bye?$`)
matches := re.FindStringSubmatch(ret)
if len(matches) > 2 {
ret = matches[2]
}
}
ret = strings.TrimSpace(string(output))
if err != nil {
return ret, err
}
Expand Down
4 changes: 2 additions & 2 deletions apps/behemoth/libs/pools/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ def _show_task_summary(execution: Execution, users: list[str]):
print(p.field(_('Start time'), local_now_display()))
print(p.field(_('Execution environment'), execution.plan.environment))
print(p.field(_('Job type'), execution.plan.get_category_display()))
print(p.field(_('Execution asset'), execution.asset))
print(p.field(_('Execution account'), execution.account))
print(p.field(_('Execution asset'), execution.asset or _('empty')))
print(p.field(_('Execution account'), execution.account or _('empty')))
print(p.title(_('Basic info of the task'), end=True))

def work(self, execution: Execution, users: list[str]) -> None:
Expand Down
4 changes: 1 addition & 3 deletions apps/behemoth/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,7 @@ def create_execution(self, with_auth=False, **other):

@property
def playback_executions(self):
obj = ObjectExtend.objects.filter( # noqa
obj_id=self.id
).values('meta').first() or {'meta': {}}
obj = ObjectExtend.objects.filter(obj_id=self.id).values('meta').first() or {'meta': {}} # noqa
return obj['meta'].get('playback_executions', [])


Expand Down
47 changes: 28 additions & 19 deletions apps/behemoth/serializers/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from behemoth.const import (
PlanStrategy, FORMAT_COMMAND_CACHE_KEY, PAUSE_RE, PlaybackStrategy,
FormatType, PlanCategory, PLAN_TASK_ACTIVE_KEY, TaskStatus,
ExecutionCategory,
)


Expand Down Expand Up @@ -130,36 +131,44 @@ def validate(self, attrs):
attrs['category'] = PlanCategory.sync
return attrs

def update(self, instance, validated_data):
validated_data.pop('playback_executions', None)
return super().update(instance, validated_data)

def create(self, validated_data):
execution_ids = validated_data.pop('playback_executions', [])
executions = PlaybackExecution.objects.filter(
id__in=execution_ids
).values('execution_id', 'plan_name', 'meta', 'execution__category')
plan = super().create(validated_data)
ObjectExtend.objects.create(
obj_id=plan.id, category=Plan._meta.db_table,
@staticmethod
def do_plan_other_action(plan, execution_ids):
executions = PlaybackExecution.objects.filter(id__in=execution_ids)
ObjectExtend.objects.create( # noqa
obj_id=plan.id, category=Plan._meta.db_table, # noqa
meta={'playback_executions': execution_ids}
)
# 遍历循环走SQL了吗?So crazy!
for serial, item in enumerate(executions):
asset_name = item['meta'].get('asset', '')
account_username = item['meta'].get('account', '')
asset_name = item.meta.get('asset', '')
account_username = item.meta.get('account', '')
execution = plan.create_execution(
asset_name=asset_name, account_username=account_username,
category=item['execution__category']
name=item.execution.name, asset_name=asset_name,
account_username=account_username, category=item.execution.category,
version=item.execution.version
)
command_objs = []
commands = Command.objects.filter(execution_id=item['execution_id']).order_by('index')
command_objs, command_extra = [], []
if execution.category == ExecutionCategory.pause:
command_extra.append('output')
commands = Command.objects.filter(execution_id=item.execution_id).order_by('index')
for idx, command in enumerate(commands):
command_objs.append(
Command(**command.to_dict(), index=idx, execution_id=execution.id)
Command(**command.to_dict(command_extra), index=idx, execution_id=execution.id)
)
Command.objects.bulk_create(command_objs)
commands.filter(has_delete=True).delete()

def update(self, instance, validated_data):
execution_ids = validated_data.pop('playback_executions', None)
plan = super().update(instance, validated_data)
plan.executions.all().delete()
self.do_plan_other_action(plan, execution_ids)
return plan

def create(self, validated_data):
execution_ids = validated_data.pop('playback_executions', [])
plan = super().create(validated_data)
self.do_plan_other_action(plan, execution_ids)
return plan


Expand Down
9 changes: 5 additions & 4 deletions apps/behemoth/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ def run_task_sync(executions: list[Execution], users: list[str]):
if worker_pool.is_task_failed(pre_task_id):
break

if execution.category == ExecutionCategory.pause:
if num == 1 and execution.category == ExecutionCategory.pause:
execution.status = TaskStatus.success
execution.save(update_fields=['status'])
total -= 1
print(p.info('检测到第一条命令类型为暂停,认为已经执行过,跳过处理'))

print(p.info(
_('There are %s batches of tasks in total. The %sth task has started to execute.'
) % (total, num))
)
try:
execution.status = TaskStatus.executing
execution.save(update_fields=['status'])
if execution.status != TaskStatus.success:
execution.status = TaskStatus.executing
execution.save(update_fields=['status'])

if execution.plan.category == PlanCategory.sync:
environment = execution.plan.environment
Expand Down

0 comments on commit 995aec8

Please sign in to comment.