Skip to content

Commit

Permalink
feat: Media服务优化 #2834
Browse files Browse the repository at this point in the history
* bug: 修复视频转录过程中进程挂了 #2775

* feat: 支持记录作者元数据 #2817

* feat: 支持未知项目的指标上报 #2820
  • Loading branch information
felixncheng authored Dec 12, 2024
1 parent 901cd2f commit 4429dfc
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ class ArtifactMetricsConfiguration {

@Bean
fun artifactMetricsExporter(
customMetricsExporter: CustomMetricsExporter? = null
customMetricsExporter: CustomMetricsExporter? = null,
artifactMetricsProperties: ArtifactMetricsProperties,
): ArtifactMetricsExporter {
return ArtifactMetricsExporter(customMetricsExporter)
return ArtifactMetricsExporter(customMetricsExporter, artifactMetricsProperties.allowUnknownProjectExport)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,9 @@ data class ArtifactMetricsProperties(
/**
* 超过该大小的文件未命中缓存时将计入大文件缓存未命中监控数据
*/
var largeFileThreshold: DataSize = DataSize.ofGigabytes(3L)
var largeFileThreshold: DataSize = DataSize.ofGigabytes(3L),
/**
* 允许上报未知项目信息
* */
var allowUnknownProjectExport: Boolean = false,
)
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ import org.slf4j.LoggerFactory
import java.util.Queue

class ArtifactMetricsExporter(
private val customMetricsExporter: CustomMetricsExporter? = null
private val customMetricsExporter: CustomMetricsExporter? = null,
private val allowUnknownProjectExport: Boolean,
) {

fun export(queue: Queue<ArtifactTransferRecord>) {
Expand All @@ -51,7 +52,11 @@ class ArtifactMetricsExporter(
val count: Int = queue.size
for (i in 0 until count) {
val item = queue.poll()
if (item.project == StringPool.UNKNOWN || item.fullPath == StringPool.UNKNOWN) continue
if ((item.project == StringPool.UNKNOWN || item.fullPath == StringPool.UNKNOWN) &&
!allowUnknownProjectExport
) {
continue
}
val labels = convertRecordToMap(item)
val metrics = TypeOfMetricsItem.ARTIFACT_TRANSFER_RATE
val metricItem = MetricsItem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class StreamService(
name: String,
mode: StreamMode,
userId: String,
author: String,
remux: Boolean = false,
saveType: MediaType = MediaType.RAW,
transcodeExtraParams: String? = null,
Expand All @@ -112,6 +113,7 @@ class StreamService(
transcodeService,
repo,
userId,
author,
STREAM_PATH,
transcodeConfig,
)
Expand All @@ -129,7 +131,7 @@ class StreamService(
stream.saveAs()
}
stream.startPublish()
logger.info("User[$userId] publish stream $streamId")
logger.info("User[$author] publish stream $streamId")
return stream
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class MediaArtifactFileConsumer(
private val transcodeService: TranscodeService,
private val repo: RepositoryDetail,
private val userId: String,
private val author: String,
private val path: String,
private val transcodeConfig: TranscodeConfig? = null,
) : FileConsumer {
Expand All @@ -34,7 +35,7 @@ class MediaArtifactFileConsumer(
override fun accept(file: ArtifactFile, name: String) {
val filePath = "$path/$name"
val artifactInfo = ArtifactInfo(repo.projectId, repo.name, filePath)
val nodeCreateRequest = buildNodeCreateRequest(artifactInfo, file, userId)
val nodeCreateRequest = buildNodeCreateRequest(artifactInfo, file, userId, author)
storageManager.storeArtifactFile(nodeCreateRequest, file, repo.storageCredentials)
if (transcodeConfig != null) {
transcodeService.transcode(artifactInfo, transcodeConfig, userId)
Expand All @@ -45,6 +46,7 @@ class MediaArtifactFileConsumer(
artifactInfo: ArtifactInfo,
file: ArtifactFile,
userId: String,
author: String,
): NodeCreateRequest {
with(artifactInfo) {
val endTime = System.currentTimeMillis()
Expand All @@ -60,6 +62,7 @@ class MediaArtifactFileConsumer(
nodeMetadata = listOf(
MetadataModel(key = METADATA_KEY_MEDIA_START_TIME, value = startTime, system = true),
MetadataModel(key = METADATA_KEY_MEDIA_STOP_TIME, value = endTime, system = true),
MetadataModel(key = METADATA_KEY_MEDIA_AUTHOR, value = author, system = true),
),
)
}
Expand All @@ -68,5 +71,6 @@ class MediaArtifactFileConsumer(
companion object {
private const val METADATA_KEY_MEDIA_START_TIME = "media.startTime"
private const val METADATA_KEY_MEDIA_STOP_TIME = "media.stopTime"
private const val METADATA_KEY_MEDIA_AUTHOR = "media.author"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ class Mux {
* @param inputStream 视频输入流
* @param output 封装后的文件
* */
constructor(inputStream: InputStream, output: File) : this() {
constructor(inputStream: InputStream, output: File, name: String) : this() {
this.inputStream = inputStream
this.outputFile = output
this.fileName = inputStream.toString()
this.fileName = name
}

constructor()
Expand All @@ -53,6 +53,10 @@ class Mux {
private var avio: AVIOContext? = null
private var running: AtomicBoolean = AtomicBoolean(false)
private var stopFlag: AtomicBoolean = AtomicBoolean(false)
private var writeHeader = false

@Volatile
var packetCount = 0

fun start() {
if (!running.compareAndSet(false, true)) {
Expand All @@ -78,17 +82,16 @@ class Mux {
)
ifmtCtx!!.pb(avio)
}
check(avformat.avformat_open_input(ifmtCtx, fileName, null, null) >= 0) { "open failed" }
check(avformat.avformat_find_stream_info(ifmtCtx, null as? PointerPointer<*>) >= 0) {
"can't find stream info"
}
var ret = avformat.avformat_open_input(ifmtCtx, fileName, null, null)
check(ret >= 0) { "open failed [$ret]" }
ret = avformat.avformat_find_stream_info(ifmtCtx, null as? PointerPointer<*>)
check(ret >= 0) { "can't find stream info [$ret]" }
if (logger.isDebugEnabled) {
avformat.av_dump_format(ifmtCtx, 0, fileName, 0)
}
val outputFilePath = outputFile!!.absolutePath
check(avformat.avformat_alloc_output_context2(ofmtCtx, null, null, outputFilePath) >= 0) {
"create output ctx error"
}
ret = avformat.avformat_alloc_output_context2(ofmtCtx, null, null, outputFilePath)
check(ret >= 0) { "create output ctx error [$ret]" }
val streamMapping = mutableMapOf<Int, Int>()
var streamIndex = 0

Expand Down Expand Up @@ -126,9 +129,8 @@ class Mux {
check(avformat.avformat_write_header(ofmtCtx, null as? PointerPointer<*>) >= 0) {
"Error occurred when opening output file"
}

writeHeader = true
var dts = 0L
var count = 0L
while (!stopFlag.get()) {
if (avformat.av_read_frame(ifmtCtx, pkt) < 0) {
break
Expand All @@ -150,16 +152,17 @@ class Mux {
pkt!!.pos(-1)
logPacket(ofmtCtx, pkt!!, "out")
check(avformat.av_interleaved_write_frame(ofmtCtx, pkt) >= 0) { "write frame error" }
count++
packetCount++
}
logger.info("Complete remux $fileName,size ${outputFile!!.length()} B,$count packet.")
logger.info("Complete remux $fileName,size ${outputFile!!.length()} B,$packetCount packet.")
} catch (e: Exception) {
logger.error("Remux error:", e)
throw e
} finally {
release()
scope.close()
running.set(false)
logger.info("Finish remux $fileName to ${outputFile!!.absolutePath}")
}
}

Expand Down Expand Up @@ -196,10 +199,14 @@ class Mux {
}

if (ofmtCtx != null) {
avformat.av_write_trailer(ofmtCtx)
logger.info("Finish remux $fileName to ${outputFile!!.absolutePath}")
if (ofmtCtx!!.oformat().flags() and avformat.AVFMT_NOFILE == 0) {
avformat.avio_closep(ofmtCtx!!.pb())
if (writeHeader) {
avformat.av_write_trailer(ofmtCtx)
}
val oformat = ofmtCtx!!.oformat()
if (oformat != null) {
if (oformat.flags() and avformat.AVFMT_NOFILE == 0 && !ofmtCtx!!.pb().isNull) {
avformat.avio_closep(ofmtCtx!!.pb())
}
}
avformat.avformat_free_context(ofmtCtx)
ofmtCtx = null
Expand Down Expand Up @@ -263,6 +270,7 @@ class Mux {
val b = ByteArray(buf_size)
val size = inputStream.read(b)
return if (size < 0) {
logger.info("input end")
avutil.AVERROR_EOF
} else {
buf.put(b, 0, size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ class RemuxRecordingListener(
* */
private var fileName: String? = null

private var startFailed = AtomicBoolean(false)

override fun handler(packet: StreamPacket) {
if (startFailed.get()) {
return
}
pipeOut.write(packet.getData())
}

Expand All @@ -63,9 +68,14 @@ class RemuxRecordingListener(
fileName = "$name.$fileType"
val tempFileName = StringPool.randomStringByLongValue(REMUX_PREFIX, ".$fileType")
tempFilePath = Paths.get(path, tempFileName)
mux = Mux(pipeIn, tempFilePath!!.toFile())
mux = Mux(pipeIn, tempFilePath!!.toFile(), name)
val remuxFuture = threadPool.submit {
mux!!.start()
try {
mux!!.start()
} catch (e: Exception) {
logger.error("Mux start failed", e)
startFailed.set(true)
}
}
if (remuxFuture.isDone) {
throw IllegalStateException("Remux start error")
Expand All @@ -80,7 +90,11 @@ class RemuxRecordingListener(
pipeOut.close()
mux!!.stop()
pipeIn.close()
fileConsumer.accept(tempFilePath!!.toFile(), fileName!!)
if (mux!!.packetCount > 0) {
fileConsumer.accept(tempFilePath!!.toFile(), fileName!!)
} else {
logger.warn("empty stream $fileName")
}
}
} finally {
Files.deleteIfExists(tempFilePath)
Expand Down

0 comments on commit 4429dfc

Please sign in to comment.