diff --git a/VERSION.in b/VERSION.in index a58941b07..840ca8cbf 100644 --- a/VERSION.in +++ b/VERSION.in @@ -1 +1 @@ -1.3 \ No newline at end of file +1.4 \ No newline at end of file diff --git a/ci/run_python_lint.sh b/ci/run_python_lint.sh index 488b9c946..57d4f55b3 100755 --- a/ci/run_python_lint.sh +++ b/ci/run_python_lint.sh @@ -51,4 +51,5 @@ echo "Running lint for rqd/..." cd rqd python -m pylint --rcfile=../ci/pylintrc_main rqd --ignore=rqd/compiled_proto python -m pylint --rcfile=../ci/pylintrc_test tests +python -m pylint --rcfile=../ci/pylintrc_test pytests cd .. diff --git a/ci/run_python_tests.sh b/ci/run_python_tests.sh index 1259adf4c..caa66e30d 100755 --- a/ci/run_python_tests.sh +++ b/ci/run_python_tests.sh @@ -27,6 +27,7 @@ PYTHONPATH=pycue python -m unittest discover -s pyoutline/tests -t pyoutline -p PYTHONPATH=pycue python -m unittest discover -s cueadmin/tests -t cueadmin -p "*.py" PYTHONPATH=pycue:pyoutline python -m unittest discover -s cuesubmit/tests -t cuesubmit -p "*.py" python -m pytest rqd/tests +python -m pytest rqd/pytests # Xvfb no longer supports Python 2. if [[ "$python_version" =~ "Python 3" && ${args[0]} != "--no-gui" ]]; then diff --git a/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java b/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java index 1bd3806a9..3aa4d60de 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java +++ b/cuebot/src/main/java/com/imageworks/spcue/DispatchFrame.java @@ -72,5 +72,9 @@ public void setMinMemory(long minMemory) { public long getMinMemory() { return this.minMemory; } + + // Parameters to tell rqd whether or not to use Loki for frame logs and which base url to use + public boolean lokiEnabled; + public String lokiURL; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/JobDetail.java b/cuebot/src/main/java/com/imageworks/spcue/JobDetail.java index dad6f8a6d..2b837b836 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/JobDetail.java +++ b/cuebot/src/main/java/com/imageworks/spcue/JobDetail.java @@ -59,5 +59,10 @@ public class JobDetail extends JobEntity implements JobInterface, DepartmentInte public String getDepartmentId() { return deptId; } + + // Parameters to tell cuebot whether or not to Loki is used for frame logs of the job and which + // base url to use for querying them + public Boolean logLokiEnabled; + public String logLokiURL; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java index 010294323..c69b9ae86 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/DispatchQuery.java @@ -531,6 +531,8 @@ private static final String replaceQueryForFifo(String query) { "int_uid, " + "str_log_dir, " + "COALESCE(str_os, '') AS str_os, " + + "b_loki_enabled, " + + "str_loki_url, " + "frame_name, " + "frame_state, " + "pk_frame, " + @@ -572,6 +574,8 @@ private static final String replaceQueryForFifo(String query) { "job.int_uid, " + "job.str_log_dir, " + "job.str_os, " + + "job.b_loki_enabled, " + + "job.str_loki_url, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -659,6 +663,8 @@ private static final String replaceQueryForFifo(String query) { "job.int_uid, " + "job.str_log_dir, " + "job.str_os, " + + "job.b_loki_enabled, " + + "job.str_loki_url, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -747,6 +753,8 @@ private static final String replaceQueryForFifo(String query) { "job.int_uid, " + "job.str_log_dir, " + "job.str_os, " + + "job.b_loki_enabled, " + + "job.str_loki_url, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -828,6 +836,8 @@ private static final String replaceQueryForFifo(String query) { "job.int_uid, " + "job.str_log_dir, " + "job.str_os, " + + "job.b_loki_enabled, " + + "job.str_loki_url, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -912,6 +922,8 @@ private static final String replaceQueryForFifo(String query) { "job.int_uid, " + "job.str_log_dir, " + "job.str_os, " + + "job.b_loki_enabled, " + + "job.str_loki_url, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -999,6 +1011,8 @@ private static final String replaceQueryForFifo(String query) { "job.int_uid, " + "job.str_log_dir, " + "job.str_os, " + + "job.b_loki_enabled, " + + "job.str_loki_url, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -1087,6 +1101,8 @@ private static final String replaceQueryForFifo(String query) { "job.int_uid, " + "job.str_log_dir, " + "job.str_os, " + + "job.b_loki_enabled, " + + "job.str_loki_url, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + @@ -1168,6 +1184,8 @@ private static final String replaceQueryForFifo(String query) { "job.int_uid, " + "job.str_log_dir, " + "job.str_os, " + + "job.b_loki_enabled, " + + "job.str_loki_url, " + "frame.str_name AS frame_name, " + "frame.str_state AS frame_state, " + "frame.pk_frame, " + diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java index c25e00874..19a49f5b1 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java @@ -332,6 +332,8 @@ public DispatchFrame mapRow(ResultSet rs, int rowNum) throws SQLException { frame.version = rs.getInt("int_version"); frame.services = rs.getString("str_services"); frame.os = rs.getString("str_os"); + frame.lokiEnabled = rs.getBoolean("b_loki_enabled"); + frame.lokiURL = rs.getString("str_loki_url"); return frame; } }; @@ -349,6 +351,8 @@ public DispatchFrame mapRow(ResultSet rs, int rowNum) throws SQLException { "job.int_uid,"+ "job.str_log_dir,"+ "COALESCE(str_os, '') AS str_os, " + + "job.b_loki_enabled,"+ + "job.str_loki_url,"+ "frame.str_name AS frame_name, "+ "frame.str_state AS frame_state, "+ "frame.pk_frame, "+ diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/JobDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/JobDaoJdbc.java index 872ab41d7..423c66ad6 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/JobDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/JobDaoJdbc.java @@ -137,6 +137,8 @@ public JobDetail mapRow(ResultSet rs, int rowNum) throws SQLException { job.showName = rs.getString("show_name"); job.facilityName = rs.getString("facility_name"); job.deptName = rs.getString("dept_name"); + job.logLokiEnabled = rs.getBoolean("b_loki_enabled"); + job.logLokiURL = rs.getString("str_loki_url"); return job; } }; @@ -206,6 +208,8 @@ public boolean isJobComplete(JobInterface job) { "job.pk_dept,"+ "job.pk_folder,"+ "job.str_log_dir,"+ + "job.b_loki_enabled,"+ + "job.str_loki_url,"+ "job.str_name,"+ "job.str_shot,"+ "job.str_state,"+ @@ -473,20 +477,25 @@ public boolean updateJobFinished(JobInterface job) { "int_uid," + "b_paused," + "b_autoeat,"+ - "int_max_retries " + + "int_max_retries," + + "b_loki_enabled," + + "str_loki_url " + ") " + - "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; @Override public void insertJob(JobDetail j, JobLogUtil jobLogUtil) { j.id = SqlUtil.genKeyRandom(); j.logDir = jobLogUtil.getJobLogPath(j); + j.logLokiEnabled = jobLogUtil.getLokiIsEnabled(); + j.logLokiURL = jobLogUtil.getLokiURL(); if (j.minCoreUnits < 100) { j.minCoreUnits = 100; } getJdbcTemplate().update(INSERT_JOB, j.id, j.showId, j.groupId, j.facilityId, j.deptId, j.name, j.name, j.showName, j.shot, j.user, j.email, j.state.toString(), - j.logDir, j.os, j.uid.orElse(null), j.isPaused, j.isAutoEat, j.maxRetries); + j.logDir, j.os, j.uid.orElse(null), j.isPaused, j.isAutoEat, j.maxRetries, + j.logLokiEnabled, j.logLokiURL); } private static final String JOB_EXISTS = diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java index 1bc6bed59..33b0c4d33 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/WhiteboardDaoJdbc.java @@ -1190,7 +1190,9 @@ public Job mapRow(ResultSet rs, int rowNum) throws SQLException { .setHasComment(rs.getBoolean("b_comment")) .setAutoEat(rs.getBoolean("b_autoeat")) .setStartTime((int) (rs.getTimestamp("ts_started").getTime() / 1000)) - .setOs(SqlUtil.getString(rs,"str_os")); + .setOs(SqlUtil.getString(rs,"str_os")) + .setLokiEnabled(rs.getBoolean("b_loki_enabled")) + .setLokiUrl(SqlUtil.getString(rs, "str_loki_url")); int uid = rs.getInt("int_uid"); if (!rs.wasNull()) { @@ -1935,6 +1937,8 @@ public Show mapRow(ResultSet rs, int rowNum) throws SQLException { "SELECT " + "job.pk_job,"+ "job.str_log_dir," + + "job.b_loki_enabled," + + "job.str_loki_url," + "job_resource.int_max_cores," + "job_resource.int_min_cores," + "job_resource.int_max_gpus," + diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java index 80a1ff362..7cd03781d 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java @@ -385,6 +385,8 @@ public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) { .setShow(frame.show) .setUserName(frame.owner) .setLogDir(frame.logDir) + .setLokiEnabled(frame.lokiEnabled) + .setLokiUrl(frame.lokiURL) .setJobId(frame.jobId) .setJobName(frame.jobName) .setFrameId(frame.id) diff --git a/cuebot/src/main/java/com/imageworks/spcue/util/JobLogUtil.java b/cuebot/src/main/java/com/imageworks/spcue/util/JobLogUtil.java index c223ebbc0..5f96eaeb6 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/util/JobLogUtil.java +++ b/cuebot/src/main/java/com/imageworks/spcue/util/JobLogUtil.java @@ -66,4 +66,12 @@ public String getJobLogRootDir(String os) { return env.getRequiredProperty("log.frame-log-root.default_os", String.class); } } -} \ No newline at end of file + + public Boolean getLokiIsEnabled() { + return env.getRequiredProperty("log.loki.enabled", Boolean.class); + } + + public String getLokiURL() { + return env.getRequiredProperty("log.loki.url", String.class); + } +} diff --git a/cuebot/src/main/resources/conf/ddl/postgres/migrations/V32__Add_loki_job_fields.sql b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V32__Add_loki_job_fields.sql new file mode 100644 index 000000000..7d5409cc7 --- /dev/null +++ b/cuebot/src/main/resources/conf/ddl/postgres/migrations/V32__Add_loki_job_fields.sql @@ -0,0 +1,6 @@ +alter table job + add b_loki_enabled bool; + +alter table job + add str_loki_url varchar(256); + diff --git a/cuebot/src/main/resources/opencue.properties b/cuebot/src/main/resources/opencue.properties index 0340da60e..12a821a4d 100644 --- a/cuebot/src/main/resources/opencue.properties +++ b/cuebot/src/main/resources/opencue.properties @@ -61,6 +61,14 @@ log.frame-log-root.default_os=${CUE_FRAME_LOG_DIR:/shots} # are planning to use a folder in the root, use: # - log.frame-log-root.Windows=${S:} +# Loki +# To enable rqd frame logs to Loki enable this flag and configure the url as shown below. When Loki +# rqd frame logs are enable all frame logs will be streamed to the loki server instead of using the +# CUE_FRAME_LOG_DIR filesystem path. Refer to the documentaion on how to configure Loki. +log.loki.enabled=false +# This is the base url of the Loki server. If the url is not reachable, rqd will fail running frames. +log.loki.url=http://localhost/loki/api + # Maximum number of jobs to query. dispatcher.job_query_max=20 # Number of seconds before waiting to book the same job from a different host. diff --git a/cuebot/src/test/resources/opencue.properties b/cuebot/src/test/resources/opencue.properties index 5ec6fba06..b04585bb9 100644 --- a/cuebot/src/test/resources/opencue.properties +++ b/cuebot/src/test/resources/opencue.properties @@ -93,4 +93,8 @@ dispatcher.memory.mem_reserved_min = 262144 dispatcher.memory.mem_reserved_system = 524288 dispatcher.memory.mem_gpu_reserved_default = 0 dispatcher.memory.mem_gpu_reserved_min = 0 -dispatcher.memory.mem_gpu_reserved_max = 104857600 \ No newline at end of file +dispatcher.memory.mem_gpu_reserved_max = 104857600 + +# Loki +log.loki.enabled = false +log.loki.url = http://localhost/loki/api diff --git a/cuegui/cuegui/App.py b/cuegui/cuegui/App.py index 8e5409520..6a48e1ca6 100644 --- a/cuegui/cuegui/App.py +++ b/cuegui/cuegui/App.py @@ -30,6 +30,7 @@ class CueGuiApplication(QtWidgets.QApplication): # Global signals display_log_file_content = QtCore.Signal(object) + select_frame = QtCore.Signal(object, object) double_click = QtCore.Signal(object) facility_changed = QtCore.Signal() single_click = QtCore.Signal(object) diff --git a/cuegui/cuegui/FrameMonitorTree.py b/cuegui/cuegui/FrameMonitorTree.py index 46da7ee30..108e907fc 100644 --- a/cuegui/cuegui/FrameMonitorTree.py +++ b/cuegui/cuegui/FrameMonitorTree.py @@ -359,6 +359,7 @@ def __itemSingleClickedViewLog(self, item, col): old_log_files = [] self.app.display_log_file_content.emit([current_log_file] + old_log_files) + self.app.select_frame.emit(self.__job, item.rpcObject) def __itemDoubleClickedViewLog(self, item, col): """Called when a frame is double clicked, views the frame log in a popup diff --git a/cuegui/cuegui/plugins/LokiViewPlugin.py b/cuegui/cuegui/plugins/LokiViewPlugin.py new file mode 100644 index 000000000..c7918434e --- /dev/null +++ b/cuegui/cuegui/plugins/LokiViewPlugin.py @@ -0,0 +1,154 @@ +# Copyright Contributors to the OpenCue Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Plugin for viewing loki logs.""" + +import string +import time +import datetime + +from qtpy import QtCore +from qtpy import QtWidgets + +from opencue.wrappers import job, frame + +from loki_urllib3_client import LokiClient + +import cuegui.Constants +import cuegui.AbstractDockWidget + +PLUGIN_NAME = 'LokiView' +PLUGIN_CATEGORY = 'Other' +PLUGIN_DESCRIPTION = 'Displays Frame Log from Loki' +PLUGIN_PROVIDES = 'LokiViewPlugin' +PRINTABLE = set(string.printable) + +class LokiViewWidget(QtWidgets.QWidget): + """ + Displays the log file for the selected frame + """ + client = None + def __init__(self, parent=None): + super().__init__(parent) + self.app = cuegui.app() + self.verticalLayout = QtWidgets.QVBoxLayout(self) + self.horizontalLayout = QtWidgets.QHBoxLayout() + self.frameNameLabel = QtWidgets.QLabel(self) + self.horizontalLayout.addWidget(self.frameNameLabel) + self.frameLogCombo = QtWidgets.QComboBox(self) + self.frameLogCombo.setSizeAdjustPolicy(QtWidgets.QComboBox.AdjustToContents) + self.horizontalLayout.addWidget(self.frameLogCombo) + self.wordWrapCheck = QtWidgets.QCheckBox(self) + self.wordWrapCheck.setText("Word Wrap") + self.horizontalLayout.addWidget(self.wordWrapCheck) + self.refreshButton = QtWidgets.QPushButton(self) + self.refreshButton.setText("Refresh") + self.horizontalLayout.addWidget(self.refreshButton) + self.horizontalLayout.setStretch(0, 1) + self.verticalLayout.addLayout(self.horizontalLayout) + self.frameText = QtWidgets.QTextEdit(self) + self.frameText.setStyleSheet("pre {display: inline;}") + self.frameText.setLineWrapMode(QtWidgets.QTextEdit.NoWrap) + self.frameText.setReadOnly(True) + self.verticalLayout.addWidget(self.frameText) + self.horizontalLayout_2 = QtWidgets.QHBoxLayout() + self.caseCheck = QtWidgets.QCheckBox(self) + self.caseCheck.setText("Aa") + self.horizontalLayout_2.addWidget(self.caseCheck) + self.searchLine = QtWidgets.QLineEdit(self) + self.searchLine.setPlaceholderText("Search log..") + self.searchLine.setText("") + self.searchLine.setClearButtonEnabled(True) + self.horizontalLayout_2.addWidget(self.searchLine) + self.findButton = QtWidgets.QPushButton(self) + self.findButton.setText("Find") + self.horizontalLayout_2.addWidget(self.findButton) + self.verticalLayout.addLayout(self.horizontalLayout_2) + + self.frameLogCombo.currentIndexChanged.connect(self._selectLog) + self.app.select_frame.connect(self._display_frame_log) + + def _display_frame_log(self, jobObj: job.Job, frameObj: frame.Frame): + jobName = jobObj.name() + frameName = frameObj.name() + frameId = frameObj.id() + self.frameLogCombo.clear() + if jobObj.lokiEnabled(): + self.frameNameLabel.setText(f"{jobName}.{frameName}") + self.client = LokiClient(jobObj.lokiURL()) + maxTries = 5 + tries = 0 + while tries < maxTries: + if self.client.ready() is True: + break + tries += 1 + time.sleep(0.5 * tries) + success, result = self.client.label_values( + label="session_start_time", + start=datetime.datetime.fromtimestamp(jobObj.startTime()), + params={'query': f'{{frame_id="{frameId}"}}'} + ) + if success is True: + labelValues = result.get('data', []) + for unix_timestamp in sorted(labelValues, reverse=True): + query = f'{{session_start_time="{unix_timestamp}", frame_id="{frameId}"}}' + data = [unix_timestamp, query] + self.frameLogCombo.addItem( + _unix_to_datetime(int(float(unix_timestamp))), userData=data + ) + self.frameLogCombo.adjustSize() + else: + pass + + # pylint: disable=unused-argument + def _selectLog(self, index): + self.frameText.clear() + if self.frameLogCombo.currentData(): + timestamp, query = self.frameLogCombo.currentData() + start = datetime.datetime.fromtimestamp(float(timestamp)) + end = datetime.datetime.now() + success, result = self.client.query_range(query=query, + direction=LokiClient.Direction.forward, + limit=1000, start=start, end=end) + if success is True: + for res in result.get('data', {}).get('result', []): + for timestamp, line in res.get('values'): + self.frameText.append(f"
{line}
") + else: + print(success, result) + + +def _unix_to_datetime(unix_timestamp): + """Simple function to convert from timestamp to human readable string""" + return datetime.datetime.fromtimestamp(int(unix_timestamp)).strftime('%Y-%m-%d %H:%M:%S') + + +class LokiViewPlugin(cuegui.AbstractDockWidget.AbstractDockWidget): + """ + Plugin for displaying the log file content for the selected frame with + the ability to perself regex-based search. + """ + + def __init__(self, parent=None): + """ + Create a LogViewPlugin instance + + @param parent: The parent widget + @type parent: QtWidgets.QWidget or None + """ + cuegui.AbstractDockWidget.AbstractDockWidget.__init__( + self, parent, PLUGIN_NAME, QtCore.Qt.BottomDockWidgetArea) + self.logview_widget = LokiViewWidget(self) + self.layout().addWidget(self.logview_widget) diff --git a/proto/job.proto b/proto/job.proto index 0962d7896..05ce3ffc3 100644 --- a/proto/job.proto +++ b/proto/job.proto @@ -641,6 +641,8 @@ message Job { JobStats job_stats = 20; float min_gpus = 21; float max_gpus = 22; + bool loki_enabled = 23; + string loki_url = 24; } // Use to filter the job search. Please note that by searching for non-pending jobs, the output is limited to 200 jobs diff --git a/proto/rqd.proto b/proto/rqd.proto index 327233bb0..04485fe50 100644 --- a/proto/rqd.proto +++ b/proto/rqd.proto @@ -117,6 +117,8 @@ message RunFrame { string os = 25; int64 soft_memory_limit = 26; int64 hard_memory_limit = 27; + bool loki_enabled = 28; + string loki_url = 29; } message RunFrameSeq { diff --git a/pycue/opencue/wrappers/job.py b/pycue/opencue/wrappers/job.py index e582a91bf..c87d19628 100644 --- a/pycue/opencue/wrappers/job.py +++ b/pycue/opencue/wrappers/job.py @@ -814,6 +814,21 @@ def shutdownIfCompleted(self): self.stub.ShutdownIfCompleted(job_pb2.JobShutdownIfCompletedRequest(job=self.data), timeout=Cuebot.Timeout) + def lokiEnabled(self): + """Returns whether or now loki si enabled + + :rtype: bool + :return: Return true if loki is enabled + """ + return self.data.loki_enabled + + def lokiURL(self): + """Returns url for loki server on the job + + :rtype: str + :return: Return URL of loki server of the job + """ + return self.data.loki_url class NestedJob(Job): """This class contains information and actions related to a nested job.""" diff --git a/requirements.txt b/requirements.txt index 83a77fcf6..0a6bc594b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,9 +15,13 @@ pynput==1.7.6 PyYAML==5.1 six==1.16.0 pytest==8.3.3 +urllib3==1.26.20;python_version<="3.9" +urllib3==2.2.3;python_version>"3.9" # Optional requirements # Sentry support for rqd sentry-sdk==2.11.0 -docker==7.1.0 \ No newline at end of file +docker==7.1.0 + +loki-urllib3-client==0.2.2 diff --git a/rqd/pytests/__init__.py b/rqd/pytests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/rqd/pytests/test_rqdlogging.py b/rqd/pytests/test_rqdlogging.py new file mode 100644 index 000000000..416aa30fc --- /dev/null +++ b/rqd/pytests/test_rqdlogging.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python + +# Copyright Contributors to the OpenCue Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Pytests for rqd.rqdlogging""" + + +import mock +import pytest +import rqd.compiled_proto.rqd_pb2 +from rqd.rqlogging import LokiLogger + +@pytest.fixture +@mock.patch('rqd.compiled_proto.rqd_pb2_grpc.RunningFrameStub') +@mock.patch('rqd.compiled_proto.rqd_pb2_grpc.RqdInterfaceStub') +@mock.patch('grpc.insecure_channel', new=mock.MagicMock()) +def runFrame(stubMock, frameStubMock): + rf = rqd.compiled_proto.rqd_pb2.RunFrame() + rf.job_id = "SD6F3S72DJ26236KFS" + rf.job_name = "edu-trn_job-name" + rf.frame_id = "FD1S3I154O646UGSNN" + return rf + +# pylint: disable=redefined-outer-name +def test_LokiLogger(runFrame): + ll = LokiLogger("http://localhost:3100", runFrame) + assert isinstance(ll, LokiLogger) + +def test_LokiLogger_invalid_runFrame(): + rf = None + + with pytest.raises(AttributeError) as excinfo: + LokiLogger("http://localhost:3100", rf) + assert excinfo.type == AttributeError diff --git a/rqd/rqd/rqcore.py b/rqd/rqd/rqcore.py index 4c3a04199..24faad55f 100644 --- a/rqd/rqd/rqcore.py +++ b/rqd/rqd/rqcore.py @@ -1344,7 +1344,10 @@ def run(self): # Setup frame logging try: - self.rqlog = rqd.rqlogging.RqdLogger(runFrame.log_dir_file) + if self.runFrame.loki_enabled: + self.rqlog = rqd.rqlogging.LokiLogger(self.runFrame.loki_url, runFrame) + else: + self.rqlog = rqd.rqlogging.RqdLogger(runFrame.log_dir_file) self.rqlog.waitForFile() # pylint: disable=broad-except except Exception as e: diff --git a/rqd/rqd/rqlogging.py b/rqd/rqd/rqlogging.py index e8878c0d9..b3843809c 100644 --- a/rqd/rqd/rqlogging.py +++ b/rqd/rqd/rqlogging.py @@ -22,6 +22,7 @@ import datetime import platform +from loki_urllib3_client import LokiClient import rqd.rqconstants log = logging.getLogger(__name__) @@ -131,3 +132,56 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): pass + +class LokiLogger(object): + """Class for logging to a loki server. It mimics a file object as much as possible""" + def __init__(self, lokiURL, runFrame): + self.client = LokiClient(url=lokiURL) + self.runFrame = runFrame + self.sessionStartTime = datetime.datetime.now().timestamp() + self.defaultLogData = { + 'host': platform.node(), + 'job_name': self.runFrame.job_name, + 'frame_name': self.runFrame.frame_name, + 'username': self.runFrame.user_name, + 'frame_id': self.runFrame.frame_id, + 'session_start_time': str(self.sessionStartTime) + } + + def waitForFile(self, maxTries=5): + """Waits for the connection to be ready before continuing""" + tries = 0 + while tries < maxTries: + if self.client.ready() is True: + return + tries += 1 + time.sleep(0.5 * tries) + raise IOError("Failed to create loki stream") + + # pylint: disable=unused-argument + def write(self, data, prependTimestamp=False): + """ + Provides write function for writing to loki server. + Ignores prepentTimeStamp which is redundant with Loki + """ + if len(data.strip()) == 0: + return + if isinstance(data, bytes): + data = data.decode('utf-8', errors='ignore') + requestStatus, requestCode = self.client.post(self.defaultLogData, [data.strip()]) + if requestStatus is not True: + raise IOError(f"Failed to write log to loki server with error : {requestCode}") + + def writelines(self, __lines): + """Provides support for writing mutliple lines at a time""" + for line in __lines: + self.write(line) + + def close(self): + """Dummy function since cloasing it not necessary for the http connection""" + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass