[DSIP-99][TaskPlugin] Save task output to a separate file#18098
[DSIP-99][TaskPlugin] Save task output to a separate file#18098Mrhs121 wants to merge 4 commits intoapache:devfrom
Conversation
|
initial commit |
|
This commit was done a long time ago. When I just rebase the code, there were many conflicts. After using AI to automatically solve them, I found many error points. I need to recheck them |
|
|
||
| @Test | ||
| public void testQueryLogInSpecifiedProject() { | ||
| long projectCode = 1L; |
There was a problem hiding this comment.
dc38d34 this commit removed the related service, So I casually removed the useless code from the test as well
I'm not ready yet. I haven't tested myself yet |
|
Feel free to ping me when you are ready to review. @Mrhs121 |
0100189 to
111aeca
Compare
|
| public enum TaskLogType { | ||
|
|
||
| LOG { | ||
|
|
||
| @Override | ||
| public String getLogPath(TaskInstance taskInstance) { | ||
| return taskInstance.getLogPath(); | ||
| } | ||
| }, | ||
| OUTPUT { | ||
|
|
||
| @Override | ||
| public String getLogPath(TaskInstance taskInstance) { | ||
| return taskInstance.getTaskOutputLogPath(); | ||
| } | ||
| }; | ||
|
|
||
| public abstract String getLogPath(TaskInstance taskInstance); | ||
| } |
There was a problem hiding this comment.
It's better to split this kind of logic from enum to a seperate class.
| host varchar(135) DEFAULT NULL, | ||
| execute_path varchar(200) DEFAULT NULL, | ||
| log_path longtext DEFAULT NULL, | ||
| task_output_log_path longtext DEFAULT NULL, |
There was a problem hiding this comment.
| task_output_log_path longtext DEFAULT NULL, | |
| task_output_log_path varchar(255) DEFAULT NULL, |
Don't use longtext
| <logger name="TaskOutput" level="INFO" additivity="false"> | ||
| <appender-ref ref="TASKOUTPUTLOGFILE"/> | ||
| </logger> |
| @@ -205,13 +214,19 @@ private Optional<CompletableFuture<?>> collectPodLogIfNeeded() { | |||
| String line; | |||
| try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) { | |||
| while ((line = reader.readLine()) != null) { | |||
| log.info("[K8S-pod-log-{}]: {}", taskRequest.getTaskName(), line); | |||
| if (StringUtils.isBlank(taskRequest.getTaskOutputLogPath())) { | |||
| log.info("[K8S-pod-log-{}]: {}", taskRequest.getTaskName(), line); | |||
| } else { | |||
| TASK_OUTPUT_LOGGER.info(line); | |||
| } | |||
| } | |||
| } | |||
| } | |||
| } catch (Exception e) { | |||
| log.error("Collect pod log error", e); | |||
| throw new RuntimeException(e); | |||
| } finally { | |||
| LogUtils.removeTaskInstanceLogFullPathMDC(); | |||
There was a problem hiding this comment.
We shouldn't only one logger is enough.
Don't need to do below judge.
if (StringUtils.isBlank(taskRequest.getTaskOutputLogPath())) {
log.info("[K8S-pod-log-{}]: {}", taskRequest.getTaskName(), line);
} else {
TASK_OUTPUT_LOGGER.info(line);
}
| try ( | ||
| LogUtils.MDCAutoClosableContext ignored = | ||
| LogUtils.withTaskOutputLogPathMDC(taskRequest.getTaskOutputLogPath())) { | ||
| for (String line : (Iterable<String>) inReader.lines()::iterator) { | ||
| if (StringUtils.isBlank(taskRequest.getTaskOutputLogPath())) { | ||
| log.info(" -> {}", line); | ||
| } else { | ||
| TASK_OUTPUT_LOGGER.info(line); | ||
| } | ||
| taskOutputParameterParser.appendParseLog(line); | ||
| } | ||
| } finally { | ||
| LogUtils.removeTaskInstanceLogFullPathMDC(); |
There was a problem hiding this comment.
In which case taskRequest.getTaskOutputLogPath() can be empty?
| public Result<ResponseTaskLog> queryTaskLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, | ||
| @RequestParam(value = "taskInstanceId") int taskInstanceId, | ||
| @RequestParam(value = "skipLineNum") int skipNum, | ||
| @RequestParam(value = "limit") int limit) { | ||
| return loggerService.queryTaskLog(loginUser, taskInstanceId, skipNum, limit); | ||
| } |
There was a problem hiding this comment.
Add a request param logType is enough, don't add a new api.
| public TaskInstanceLogFileDownloadResponse getTaskLog(TaskInstance taskInstance) { | ||
| return getLocalWholeLog(taskInstance, TaskLogType.LOG); | ||
| } | ||
|
|
||
| public TaskInstanceLogFileDownloadResponse getTaskOutput(TaskInstance taskInstance) { | ||
| return getLocalWholeLog(taskInstance, TaskLogType.OUTPUT); | ||
| } |
There was a problem hiding this comment.
| public TaskInstanceLogFileDownloadResponse getTaskLog(TaskInstance taskInstance) { | |
| return getLocalWholeLog(taskInstance, TaskLogType.LOG); | |
| } | |
| public TaskInstanceLogFileDownloadResponse getTaskOutput(TaskInstance taskInstance) { | |
| return getLocalWholeLog(taskInstance, TaskLogType.OUTPUT); | |
| } | |
| public TaskInstanceLogFileDownloadResponse getTaskLog(TaskInstance taskInstance, TaskLogType logType) { | |
| return getLocalWholeLog(taskInstance, TaskLogType.LOG); | |
| } |
| public TaskInstanceLogPageQueryResponse getTaskLog(TaskInstance taskInstance, int skipLineNum, int limit) { | ||
| return getLocalPartLog(taskInstance, skipLineNum, limit, TaskLogType.LOG); | ||
| } | ||
|
|
||
| public TaskInstanceLogPageQueryResponse getTaskOutput(TaskInstance taskInstance, int skipLineNum, int limit) { | ||
| return getLocalPartLog(taskInstance, skipLineNum, limit, TaskLogType.OUTPUT); | ||
| } |
There was a problem hiding this comment.
| public TaskInstanceLogPageQueryResponse getTaskLog(TaskInstance taskInstance, int skipLineNum, int limit) { | |
| return getLocalPartLog(taskInstance, skipLineNum, limit, TaskLogType.LOG); | |
| } | |
| public TaskInstanceLogPageQueryResponse getTaskOutput(TaskInstance taskInstance, int skipLineNum, int limit) { | |
| return getLocalPartLog(taskInstance, skipLineNum, limit, TaskLogType.OUTPUT); | |
| } | |
| public TaskInstanceLogPageQueryResponse getTaskLog(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType logType) { | |
| return getLocalPartLog(taskInstance, skipLineNum, limit, TaskLogType.LOG); | |
| } |
|
@ruanwenjun Thanks for the review. I summarized the feedback and these points make sense, I'll update.
Regarding this idea :#17791 (comment). My understanding is that the concern is not only about adding another file path field for task output, but also about the long-term storage model. Instead of storing separate file paths like log_path and task_output_log_path, a more extensible approach would be to store a single directory path such as task_out_path, and place all task-instance-generated files under it, for example log, output, and possibly other generated files in the future. maybe such as:
|
| private void logHttpResponse(String message, int statusCode, String checkCondition, String body) { | ||
| if (StringUtils.isBlank(taskExecutionContext.getTaskOutputLogPath())) { | ||
| if (checkCondition == null) { | ||
| log.info(message, httpParameters.getUrl(), statusCode, body); | ||
| } else { | ||
| log.error(message, httpParameters.getUrl(), statusCode, checkCondition, body); | ||
| } | ||
| return; | ||
| } | ||
| LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); | ||
| try ( | ||
| LogUtils.MDCAutoClosableContext ignored = | ||
| LogUtils.withTaskOutputLogPathMDC(taskExecutionContext.getTaskOutputLogPath())) { | ||
| if (checkCondition == null) { | ||
| TASK_OUTPUT_LOGGER.info(message, httpParameters.getUrl(), statusCode, body); | ||
| } else { | ||
| TASK_OUTPUT_LOGGER.info(message, httpParameters.getUrl(), statusCode, checkCondition, body); | ||
| } | ||
| } finally { | ||
| LogUtils.removeTaskInstanceLogFullPathMDC(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Logic like this should not be customized by users, but should be handled at spi level.
| </appender> | ||
| </sift> | ||
| </appender> | ||
| <appender name="TASKOUTPUTLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender"> |
There was a problem hiding this comment.
We should add an automatic cleaning policy like other logs.




Was this PR generated or assisted by AI?
Purpose of the pull request
close #17791
Brief change log
Verify this pull request
This pull request is code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(or)
Pull Request Notice
Pull Request Notice
If your pull request contains incompatible change, you should also add it to
docs/docs/en/guide/upgrade/incompatible.md