diff --git a/apps/behemoth/api/generic.py b/apps/behemoth/api/generic.py index 58e1fcb54782..b1f5268634df 100644 --- a/apps/behemoth/api/generic.py +++ b/apps/behemoth/api/generic.py @@ -24,13 +24,16 @@ Environment, Playback, Plan, Iteration, Execution, Command, PlaybackExecution, MonthlyVersion ) -from common.utils import is_uuid +from common.utils import is_uuid, get_logger from common.exceptions import JMSException from common.utils.timezone import local_now_display from orgs.mixins.api import OrgBulkModelViewSet from orgs.utils import get_current_org_id +logger = get_logger(__file__) + + class ExecutionMixin: @staticmethod def start_task( @@ -103,24 +106,20 @@ class MonthlyVersionViewSet(OrgBulkModelViewSet): 'get_playbacks': ['behemoth.view_playbacks'] } - @action(methods=['GET', 'PATCH', 'DELETE'], detail=True, url_path='playbacks') + @action(methods=['PATCH', 'DELETE'], detail=True, url_path='playbacks') def get_playbacks(self, request, *args, **kwargs): obj = self.get_object() - if request.method == 'GET': - qs = obj.playbacks.all() - return self.get_paginated_response_from_queryset(qs) - else: - serializer = serializers.PlaybackMonthlyVersionSerializer(data=request.data) - if serializer.is_valid(): - assets = serializer.validated_data.get('playbacks') - action_ = serializer.validated_data['action'] - if action_ == 'remove': - obj.playbacks.remove(*tuple(assets)) - else: - obj.playbacks.add(*tuple(assets)) - return Response(status=http_status.HTTP_200_OK) + serializer = serializers.PlaybackMonthlyVersionSerializer(data=request.data) + if serializer.is_valid(): + assets = serializer.validated_data.get('playbacks') + action_ = serializer.validated_data['action'] + if action_ == 'remove': + obj.playbacks.remove(*tuple(assets)) else: - return Response(status=http_status.HTTP_400_BAD_REQUEST, data=serializer.errors) + obj.playbacks.add(*tuple(assets)) + return Response(status=http_status.HTTP_200_OK) + else: + return Response(status=http_status.HTTP_400_BAD_REQUEST, data=serializer.errors) class PlaybackExecutionViewSet(OrgBulkModelViewSet): @@ -142,6 +141,7 @@ def get_queryset(self): class PlaybackViewSet(OrgBulkModelViewSet): model = Playback search_fields = ['name'] + filterset_fields = ['monthly_version'] serializer_classes = { 'default': serializers.PlaybackSerializer, 'insert_pause': serializers.InsertPauseSerializer, @@ -316,8 +316,8 @@ def get_commands(self, request, *args, **kwargs): commands[0].output = f.read() else: commands[0].output = os.path.basename(commands[0].output) - except Exception: # noqa - pass + except Exception as e: # noqa + logger.warning('Convert command error: %s', e) serializer = self.get_serializer(commands, many=True) return Response({'results': serializer.data, 'category': execution.category}) @@ -383,7 +383,7 @@ def upload_command_file(self, request, *args, **kwargs): file = self._handle_zip_file(file, entry) name, ext = os.path.splitext(file.name) - file_name = f'{name}-({local_now_display("%Y_%m_%d_%H_%M_%S")}){ext}' + file_name = f'{name}-{local_now_display("%Y_%m_%d_%H_%M_%S")}{ext}' execution = self.get_object().create_execution( with_auth=True, name=file_name, category=ExecutionCategory.file, version=serializer.validated_data['version'] @@ -404,7 +404,7 @@ def start_sync_task(self, request, *args, **kwargs): participants = getattr(settings, 'SYNC_PLAN_REQUIRED_PARTICIPANTS', 2) wait_timeout = getattr(settings, 'SYNC_PLAN_WAIT_PARTICIPANT_IDLE', 3600) if len(user_set) >= participants: - cache.set(PLAN_TASK_ACTIVE_KEY.format(obj.id), [], timeout=3600 * 24 * 7) + cache.set(PLAN_TASK_ACTIVE_KEY.format(obj.id), user_set, timeout=wait_timeout * 24 * 7) return self.start_task( obj.executions.all(), user_set, response_data={'users': [str(request.user)]} ) diff --git a/apps/behemoth/libs/go_script/script.go b/apps/behemoth/libs/go_script/script.go index 6a4739923db2..576690ca9401 100755 --- a/apps/behemoth/libs/go_script/script.go +++ b/apps/behemoth/libs/go_script/script.go @@ -17,6 +17,7 @@ import ( "os" "os/exec" "path/filepath" + "regexp" "strings" "syscall" "time" @@ -311,7 +312,7 @@ func (s *LocalScriptHandler) DoCommand(command string) (string, error) { port := fmt.Sprintf("-P%v", s.opts.Auth.Port) database := fmt.Sprintf("-D%s", s.opts.Auth.DBName) sqlPath := fmt.Sprintf("source %s", s.opts.CmdFile) - args = append(args, username, host, port, database, "-t", "-e", sqlPath) + args = append(args, username, host, port, database, "-t", "-vvv", "-e", sqlPath) } else if s.opts.Script == "sqlplus" { connectionStr = fmt.Sprintf( "%s/\"%s\"@%s:%d/%s", s.opts.Auth.Username, s.opts.Auth.Password, s.opts.Auth.Address, s.opts.Auth.Port, s.opts.Auth.DBName, @@ -330,7 +331,15 @@ func (s *LocalScriptHandler) DoCommand(command string) (string, error) { cmd.Dir = s.cmdDir } output, err := cmd.CombinedOutput() - ret := strings.TrimSpace(string(output)) + ret := string(output) + if s.opts.Script == "mysql" { + re := regexp.MustCompile(`(?s)--------------\n(.*?)\n--------------\n(.*)Bye?$`) + matches := re.FindStringSubmatch(ret) + if len(matches) > 2 { + ret = matches[2] + } + } + ret = strings.TrimSpace(string(output)) if err != nil { return ret, err } diff --git a/apps/behemoth/libs/pools/worker.py b/apps/behemoth/libs/pools/worker.py index bcc554da04e8..3297fe2eca60 100644 --- a/apps/behemoth/libs/pools/worker.py +++ b/apps/behemoth/libs/pools/worker.py @@ -283,8 +283,8 @@ def _show_task_summary(execution: Execution, users: list[str]): print(p.field(_('Start time'), local_now_display())) print(p.field(_('Execution environment'), execution.plan.environment)) print(p.field(_('Job type'), execution.plan.get_category_display())) - print(p.field(_('Execution asset'), execution.asset)) - print(p.field(_('Execution account'), execution.account)) + print(p.field(_('Execution asset'), execution.asset or _('empty'))) + print(p.field(_('Execution account'), execution.account or _('empty'))) print(p.title(_('Basic info of the task'), end=True)) def work(self, execution: Execution, users: list[str]) -> None: diff --git a/apps/behemoth/models.py b/apps/behemoth/models.py index 89ea8a5c17a1..0d4f2c355514 100644 --- a/apps/behemoth/models.py +++ b/apps/behemoth/models.py @@ -316,9 +316,7 @@ def create_execution(self, with_auth=False, **other): @property def playback_executions(self): - obj = ObjectExtend.objects.filter( # noqa - obj_id=self.id - ).values('meta').first() or {'meta': {}} + obj = ObjectExtend.objects.filter(obj_id=self.id).values('meta').first() or {'meta': {}} # noqa return obj['meta'].get('playback_executions', []) diff --git a/apps/behemoth/serializers/plan.py b/apps/behemoth/serializers/plan.py index 36d2271b04cd..b0ba060b0e71 100644 --- a/apps/behemoth/serializers/plan.py +++ b/apps/behemoth/serializers/plan.py @@ -21,6 +21,7 @@ from behemoth.const import ( PlanStrategy, FORMAT_COMMAND_CACHE_KEY, PAUSE_RE, PlaybackStrategy, FormatType, PlanCategory, PLAN_TASK_ACTIVE_KEY, TaskStatus, + ExecutionCategory, ) @@ -130,36 +131,44 @@ def validate(self, attrs): attrs['category'] = PlanCategory.sync return attrs - def update(self, instance, validated_data): - validated_data.pop('playback_executions', None) - return super().update(instance, validated_data) - - def create(self, validated_data): - execution_ids = validated_data.pop('playback_executions', []) - executions = PlaybackExecution.objects.filter( - id__in=execution_ids - ).values('execution_id', 'plan_name', 'meta', 'execution__category') - plan = super().create(validated_data) - ObjectExtend.objects.create( - obj_id=plan.id, category=Plan._meta.db_table, + @staticmethod + def do_plan_other_action(plan, execution_ids): + executions = PlaybackExecution.objects.filter(id__in=execution_ids) + ObjectExtend.objects.create( # noqa + obj_id=plan.id, category=Plan._meta.db_table, # noqa meta={'playback_executions': execution_ids} ) # 遍历循环走SQL了吗?So crazy! for serial, item in enumerate(executions): - asset_name = item['meta'].get('asset', '') - account_username = item['meta'].get('account', '') + asset_name = item.meta.get('asset', '') + account_username = item.meta.get('account', '') execution = plan.create_execution( - asset_name=asset_name, account_username=account_username, - category=item['execution__category'] + name=item.execution.name, asset_name=asset_name, + account_username=account_username, category=item.execution.category, + version=item.execution.version ) - command_objs = [] - commands = Command.objects.filter(execution_id=item['execution_id']).order_by('index') + command_objs, command_extra = [], [] + if execution.category == ExecutionCategory.pause: + command_extra.append('output') + commands = Command.objects.filter(execution_id=item.execution_id).order_by('index') for idx, command in enumerate(commands): command_objs.append( - Command(**command.to_dict(), index=idx, execution_id=execution.id) + Command(**command.to_dict(command_extra), index=idx, execution_id=execution.id) ) Command.objects.bulk_create(command_objs) commands.filter(has_delete=True).delete() + + def update(self, instance, validated_data): + execution_ids = validated_data.pop('playback_executions', None) + plan = super().update(instance, validated_data) + plan.executions.all().delete() + self.do_plan_other_action(plan, execution_ids) + return plan + + def create(self, validated_data): + execution_ids = validated_data.pop('playback_executions', []) + plan = super().create(validated_data) + self.do_plan_other_action(plan, execution_ids) return plan diff --git a/apps/behemoth/tasks.py b/apps/behemoth/tasks.py index 336cd3235529..10bb95772c33 100644 --- a/apps/behemoth/tasks.py +++ b/apps/behemoth/tasks.py @@ -17,18 +17,19 @@ def run_task_sync(executions: list[Execution], users: list[str]): if worker_pool.is_task_failed(pre_task_id): break - if execution.category == ExecutionCategory.pause: + if num == 1 and execution.category == ExecutionCategory.pause: execution.status = TaskStatus.success execution.save(update_fields=['status']) - total -= 1 + print(p.info('检测到第一条命令类型为暂停,认为已经执行过,跳过处理')) print(p.info( _('There are %s batches of tasks in total. The %sth task has started to execute.' ) % (total, num)) ) try: - execution.status = TaskStatus.executing - execution.save(update_fields=['status']) + if execution.status != TaskStatus.success: + execution.status = TaskStatus.executing + execution.save(update_fields=['status']) if execution.plan.category == PlanCategory.sync: environment = execution.plan.environment