From 42341b4e10308100f0b42bf0ef02c26e6cd22032 Mon Sep 17 00:00:00 2001 From: minjaesong Date: Fri, 12 Dec 2025 16:26:30 +0900 Subject: [PATCH] TAV-DT multithreaded decoding --- video_encoder/src/decoder_tav_dt.c | 666 +++++++++++++++++++---------- 1 file changed, 435 insertions(+), 231 deletions(-) diff --git a/video_encoder/src/decoder_tav_dt.c b/video_encoder/src/decoder_tav_dt.c index 6a5acde..e1460e9 100644 --- a/video_encoder/src/decoder_tav_dt.c +++ b/video_encoder/src/decoder_tav_dt.c @@ -71,25 +71,26 @@ static const int QUALITY_CG[] = {148, 133, 113, 99, 76, 39}; // Multithreading Structures // ============================================================================= -#define DECODE_SLOT_EMPTY 0 -#define DECODE_SLOT_PENDING 1 -#define DECODE_SLOT_DONE 2 +#define DECODE_SLOT_EMPTY 0 +#define DECODE_SLOT_PENDING 1 +#define DECODE_SLOT_PROCESSING 2 +#define DECODE_SLOT_DONE 3 // GOP decode job structure typedef struct { // Input - uint8_t *compressed_data; // Raw GOP data to decode + uint8_t *compressed_data; // Raw GOP data to decode (owned by job) size_t compressed_size; int gop_size; // Number of frames in this GOP int job_id; // Sequential job ID for ordering output // Output uint8_t **rgb_frames; // Decoded RGB24 frames [gop_size] - int frames_allocated; // How many frames are allocated + size_t frame_size; // Size of each frame in bytes int decode_result; // 0 = success, -1 = error // Status - volatile int status; + volatile int status; // DECODE_SLOT_EMPTY, PENDING, or DONE } gop_decode_job_t; /** @@ -375,6 +376,217 @@ static int read_and_decode_header(dt_decoder_t *dec, dt_packet_header_t *header) return 0; } +// ============================================================================= +// Multithreading Support +// ============================================================================= + +/** + * Worker thread function for parallel GOP decoding + */ +static void *decoder_worker_thread(void *arg) { + dt_decoder_t *dec = (dt_decoder_t *)arg; + int thread_id = -1; + + // Find our thread ID + for (int i = 0; i < dec->num_threads; i++) { + if (pthread_equal(dec->worker_threads[i], pthread_self())) { + thread_id = i; + break; + } + } + + if (thread_id < 0) { + fprintf(stderr, "Error: Worker thread couldn't find its ID\n"); + return NULL; + } + + tav_video_context_t *video_ctx = dec->worker_video_ctx[thread_id]; + + while (1) { + pthread_mutex_lock(&dec->mutex); + + // Look for a pending job and claim it + int job_idx = -1; + for (int i = 0; i < dec->num_slots; i++) { + if (dec->slots[i].status == DECODE_SLOT_PENDING) { + job_idx = i; + dec->slots[i].status = DECODE_SLOT_PROCESSING; // Claim it - prevents other threads from picking it + break; + } + } + + if (job_idx < 0) { + // No jobs available, check if we should exit + if (dec->threads_should_exit) { + pthread_mutex_unlock(&dec->mutex); + break; + } + + // Wait for a job + pthread_cond_wait(&dec->cond_job_available, &dec->mutex); + pthread_mutex_unlock(&dec->mutex); + continue; + } + + pthread_mutex_unlock(&dec->mutex); + + // Decode this GOP + gop_decode_job_t *job = &dec->slots[job_idx]; + + // The compressed data format: [type(1)][gop_size(1)][size(4)][zstd_data] + const uint8_t *zstd_data = job->compressed_data + 6; + size_t zstd_size = job->compressed_size > 6 ? job->compressed_size - 6 : 0; + + job->decode_result = tav_video_decode_gop(video_ctx, zstd_data, zstd_size, + job->gop_size, job->rgb_frames); + + // Mark as done + pthread_mutex_lock(&dec->mutex); + job->status = DECODE_SLOT_DONE; + dec->jobs_completed++; + pthread_cond_broadcast(&dec->cond_slot_free); + pthread_mutex_unlock(&dec->mutex); + } + + return NULL; +} + +/** + * Initialize decoder threads + */ +static int init_decoder_threads(dt_decoder_t *dec) { + if (dec->num_threads <= 1) { + return 0; // Single-threaded, nothing to initialize + } + + dec->num_slots = dec->num_threads + 2; // Pipeline with lookahead + dec->slots = calloc(dec->num_slots, sizeof(gop_decode_job_t)); + if (!dec->slots) { + fprintf(stderr, "Error: Cannot allocate decode slots\n"); + return -1; + } + + // Initialize slots + for (int i = 0; i < dec->num_slots; i++) { + dec->slots[i].status = DECODE_SLOT_EMPTY; + dec->slots[i].job_id = -1; + dec->slots[i].rgb_frames = NULL; + dec->slots[i].compressed_data = NULL; + } + + // Create per-thread video decoder contexts + dec->worker_video_ctx = calloc(dec->num_threads, sizeof(tav_video_context_t*)); + if (!dec->worker_video_ctx) { + free(dec->slots); + return -1; + } + + tav_video_params_t vparams; + vparams.width = dec->width; + vparams.height = dec->is_interlaced ? dec->height / 2 : dec->height; + vparams.decomp_levels = DT_SPATIAL_LEVELS; + vparams.temporal_levels = DT_TEMPORAL_LEVELS; + vparams.wavelet_filter = 1; // CDF 9/7 + vparams.temporal_wavelet = 255; // Haar + vparams.entropy_coder = 1; // EZBC + vparams.channel_layout = 0; // YCoCg-R + vparams.perceptual_tuning = 1; + vparams.quantiser_y = QUALITY_Y[dec->quality_index]; + vparams.quantiser_co = QUALITY_CO[dec->quality_index]; + vparams.quantiser_cg = QUALITY_CG[dec->quality_index]; + vparams.encoder_preset = 0x01; // Sports + vparams.monoblock = 1; + + for (int i = 0; i < dec->num_threads; i++) { + dec->worker_video_ctx[i] = tav_video_create(&vparams); + if (!dec->worker_video_ctx[i]) { + fprintf(stderr, "Error: Cannot create video decoder for thread %d\n", i); + return -1; + } + } + + // Initialize threading primitives + pthread_mutex_init(&dec->mutex, NULL); + pthread_cond_init(&dec->cond_job_available, NULL); + pthread_cond_init(&dec->cond_slot_free, NULL); + dec->threads_should_exit = 0; + dec->next_write_slot = 0; + dec->jobs_submitted = 0; + dec->jobs_completed = 0; + + // Create worker threads + dec->worker_threads = calloc(dec->num_threads, sizeof(pthread_t)); + if (!dec->worker_threads) { + return -1; + } + + for (int i = 0; i < dec->num_threads; i++) { + if (pthread_create(&dec->worker_threads[i], NULL, decoder_worker_thread, dec) != 0) { + fprintf(stderr, "Error: Cannot create worker thread %d\n", i); + return -1; + } + } + + if (dec->verbose) { + printf("Initialized %d decoder threads\n", dec->num_threads); + } + + return 0; +} + +/** + * Cleanup decoder threads + */ +static void cleanup_decoder_threads(dt_decoder_t *dec) { + if (dec->num_threads <= 1) { + return; + } + + // Signal threads to exit + pthread_mutex_lock(&dec->mutex); + dec->threads_should_exit = 1; + pthread_cond_broadcast(&dec->cond_job_available); + pthread_mutex_unlock(&dec->mutex); + + // Wait for threads + if (dec->worker_threads) { + for (int i = 0; i < dec->num_threads; i++) { + pthread_join(dec->worker_threads[i], NULL); + } + free(dec->worker_threads); + } + + // Free video contexts + if (dec->worker_video_ctx) { + for (int i = 0; i < dec->num_threads; i++) { + if (dec->worker_video_ctx[i]) { + tav_video_free(dec->worker_video_ctx[i]); + } + } + free(dec->worker_video_ctx); + } + + // Free slots + if (dec->slots) { + for (int i = 0; i < dec->num_slots; i++) { + if (dec->slots[i].rgb_frames) { + for (int f = 0; f < dec->slots[i].gop_size; f++) { + free(dec->slots[i].rgb_frames[f]); + } + free(dec->slots[i].rgb_frames); + } + if (dec->slots[i].compressed_data) { + free(dec->slots[i].compressed_data); + } + } + free(dec->slots); + } + + pthread_mutex_destroy(&dec->mutex); + pthread_cond_destroy(&dec->cond_job_available); + pthread_cond_destroy(&dec->cond_slot_free); +} + // ============================================================================= // Subpacket Decoding // ============================================================================= @@ -490,6 +702,161 @@ static int decode_audio_subpacket(dt_decoder_t *dec, const uint8_t *data, size_t return 0; } +/** + * Multithreaded video decoding - submit GOP to worker pool + */ +static int decode_video_subpacket_mt(dt_decoder_t *dec, const uint8_t *data, size_t data_len, + size_t *consumed) { + // Minimum: 16 byte LDPC header + if (data_len < DT_TAV_HEADER_SIZE * 2) return -1; + + size_t offset = 0; + + // LDPC decode TAV header (16 bytes -> 8 bytes) + uint8_t decoded_tav_header[DT_TAV_HEADER_SIZE]; + int ldpc_result = ldpc_decode(data + offset, DT_TAV_HEADER_SIZE * 2, decoded_tav_header); + if (ldpc_result < 0) { + if (dec->verbose) { + fprintf(stderr, "Warning: LDPC decode failed for TAV header\n"); + } + memcpy(decoded_tav_header, data + offset, DT_TAV_HEADER_SIZE); + } else if (ldpc_result > 0) { + dec->fec_corrections++; + } + offset += DT_TAV_HEADER_SIZE * 2; + + // Parse TAV header + uint8_t gop_size = decoded_tav_header[0]; + uint32_t compressed_size; + uint32_t rs_block_count; + + memcpy(&compressed_size, decoded_tav_header + 1, 4); + rs_block_count = decoded_tav_header[5] | + ((uint32_t)decoded_tav_header[6] << 8) | + ((uint32_t)decoded_tav_header[7] << 16); + + // Calculate RS payload size + size_t rs_total = rs_block_count * RS_BLOCK_SIZE; + + if (offset + rs_total > data_len) { + *consumed = data_len; + return -1; + } + + // RS decode payload + uint8_t *rs_data = malloc(rs_total); + if (!rs_data) return -1; + memcpy(rs_data, data + offset, rs_total); + + uint8_t *decoded_payload = malloc(compressed_size); + if (!decoded_payload) { + free(rs_data); + return -1; + } + + int rs_result = rs_decode_blocks(rs_data, rs_total, decoded_payload, compressed_size); + if (rs_result > 0) { + dec->fec_corrections += rs_result; + } + free(rs_data); + + // Lazy initialization of multithreading (after first packet header is known) + if (!dec->worker_threads && dec->num_threads > 1) { + if (init_decoder_threads(dec) != 0) { + fprintf(stderr, "Error: Cannot initialize decoder threads, falling back to single-threaded\n"); + dec->num_threads = 1; + // Fall back to single-threaded decoding for this packet + free(decoded_payload); + *consumed = offset + rs_total; + return -1; + } + if (dec->verbose) { + printf("Initialized multithreaded decoding: %d threads\n", dec->num_threads); + } + } + + // Find an empty slot + int slot_idx = -1; + pthread_mutex_lock(&dec->mutex); + + while (slot_idx < 0) { + // Try to write completed GOPs first + for (int i = 0; i < dec->num_slots; i++) { + if (dec->slots[i].status == DECODE_SLOT_DONE && + dec->slots[i].job_id == dec->next_write_slot) { + + gop_decode_job_t *job = &dec->slots[i]; + pthread_mutex_unlock(&dec->mutex); + + // Write frames to temp file + if (job->decode_result == 0 && dec->video_temp_fp) { + for (int f = 0; f < job->gop_size; f++) { + fwrite(job->rgb_frames[f], 1, job->frame_size, dec->video_temp_fp); + dec->frames_decoded++; + } + } + + pthread_mutex_lock(&dec->mutex); + + // Free job resources while holding mutex + for (int f = 0; f < job->gop_size; f++) { + free(job->rgb_frames[f]); + } + free(job->rgb_frames); + free(job->compressed_data); + + job->status = DECODE_SLOT_EMPTY; + job->rgb_frames = NULL; + job->compressed_data = NULL; + dec->next_write_slot++; + break; + } + } + + // Look for empty slot + for (int i = 0; i < dec->num_slots; i++) { + if (dec->slots[i].status == DECODE_SLOT_EMPTY) { + slot_idx = i; + break; + } + } + + if (slot_idx < 0) { + // Wait for a slot to become available + pthread_cond_wait(&dec->cond_slot_free, &dec->mutex); + } + } + + // Fill the slot + gop_decode_job_t *job = &dec->slots[slot_idx]; + + int internal_height = dec->is_interlaced ? dec->height / 2 : dec->height; + size_t frame_size = dec->width * internal_height * 3; + + job->compressed_data = decoded_payload; // Transfer ownership + job->compressed_size = compressed_size; + job->gop_size = gop_size; + job->job_id = dec->jobs_submitted++; + job->frame_size = frame_size; + job->decode_result = -1; + + // Allocate frame buffers + job->rgb_frames = malloc(gop_size * sizeof(uint8_t*)); + for (int i = 0; i < gop_size; i++) { + job->rgb_frames[i] = malloc(frame_size); + } + + // Submit job + job->status = DECODE_SLOT_PENDING; + pthread_cond_broadcast(&dec->cond_job_available); + pthread_mutex_unlock(&dec->mutex); + + offset += rs_total; + *consumed = offset; + + return 0; +} + static int decode_video_subpacket(dt_decoder_t *dec, const uint8_t *data, size_t data_len, size_t *consumed) { // Minimum: 16 byte LDPC header @@ -797,229 +1164,6 @@ static int spawn_ffmpeg(dt_decoder_t *dec) { // ============================================================================= // Multithreading Support -// ============================================================================= - -// Worker thread function - decodes GOPs in parallel -static void *decoder_worker_thread(void *arg) { - dt_decoder_t *dec = (dt_decoder_t *)arg; - - // Get thread index by finding our thread ID in the array - int thread_idx = -1; - pthread_t self = pthread_self(); - for (int i = 0; i < dec->num_threads; i++) { - if (pthread_equal(dec->worker_threads[i], self)) { - thread_idx = i; - break; - } - } - if (thread_idx < 0) thread_idx = 0; // Fallback - - tav_video_context_t *my_video_ctx = dec->worker_video_ctx[thread_idx]; - - while (1) { - pthread_mutex_lock(&dec->mutex); - - // Find a pending slot to work on - int slot_idx = -1; - while (slot_idx < 0 && !dec->threads_should_exit) { - for (int i = 0; i < dec->num_slots; i++) { - if (dec->slots[i].status == DECODE_SLOT_PENDING && - dec->slots[i].compressed_data != NULL) { - dec->slots[i].status = DECODE_SLOT_DONE; // Claim it temporarily - slot_idx = i; - break; - } - } - if (slot_idx < 0 && !dec->threads_should_exit) { - pthread_cond_wait(&dec->cond_job_available, &dec->mutex); - } - } - - if (dec->threads_should_exit && slot_idx < 0) { - pthread_mutex_unlock(&dec->mutex); - break; - } - - pthread_mutex_unlock(&dec->mutex); - - if (slot_idx < 0) continue; - - gop_decode_job_t *job = &dec->slots[slot_idx]; - - // Decode GOP using our thread's decoder context - job->decode_result = tav_video_decode_gop( - my_video_ctx, - job->compressed_data, - job->compressed_size, - job->gop_size, - job->rgb_frames - ); - - // Free compressed data - free(job->compressed_data); - job->compressed_data = NULL; - - // Mark as done - pthread_mutex_lock(&dec->mutex); - job->status = DECODE_SLOT_DONE; - dec->jobs_completed++; - pthread_cond_broadcast(&dec->cond_slot_free); - pthread_mutex_unlock(&dec->mutex); - } - - return NULL; -} - -static int init_decoder_threads(dt_decoder_t *dec) { - if (dec->num_threads <= 0) { - return 0; // Single-threaded mode - } - - // Limit threads - if (dec->num_threads > MAX_DECODE_THREADS) { - dec->num_threads = MAX_DECODE_THREADS; - } - - // Number of slots = threads + 2 for pipelining - dec->num_slots = dec->num_threads + 2; - - // Allocate slots - dec->slots = calloc(dec->num_slots, sizeof(gop_decode_job_t)); - if (!dec->slots) { - fprintf(stderr, "Error: Failed to allocate decode slots\n"); - return -1; - } - - // Allocate frame buffers for each slot - int internal_height = dec->is_interlaced ? dec->height / 2 : dec->height; - size_t frame_size = dec->width * internal_height * 3; - int max_gop_size = 16; // TAV-DT uses fixed 16-frame GOPs - - for (int i = 0; i < dec->num_slots; i++) { - dec->slots[i].rgb_frames = malloc(max_gop_size * sizeof(uint8_t*)); - if (!dec->slots[i].rgb_frames) { - fprintf(stderr, "Error: Failed to allocate frame pointers for slot %d\n", i); - return -1; - } - for (int f = 0; f < max_gop_size; f++) { - dec->slots[i].rgb_frames[f] = malloc(frame_size); - if (!dec->slots[i].rgb_frames[f]) { - fprintf(stderr, "Error: Failed to allocate frame buffer for slot %d\n", i); - return -1; - } - } - dec->slots[i].frames_allocated = max_gop_size; - dec->slots[i].status = DECODE_SLOT_EMPTY; - dec->slots[i].job_id = -1; - } - - // Create per-thread video decoder contexts - dec->worker_video_ctx = malloc(dec->num_threads * sizeof(tav_video_context_t*)); - if (!dec->worker_video_ctx) { - fprintf(stderr, "Error: Failed to allocate worker video contexts\n"); - return -1; - } - - tav_video_params_t video_params = { - .width = dec->width, - .height = internal_height, - .decomp_levels = DT_SPATIAL_LEVELS, - .temporal_levels = DT_TEMPORAL_LEVELS, - .wavelet_filter = 1, // CDF 9/7 - .temporal_wavelet = 255, // Haar - .entropy_coder = 1, // EZBC - .channel_layout = 0, // YCoCg-R - .perceptual_tuning = 1, - .quantiser_y = QUALITY_Y[dec->quality_index], - .quantiser_co = QUALITY_CO[dec->quality_index], - .quantiser_cg = QUALITY_CG[dec->quality_index], - .encoder_preset = 0x01, // Sports - .monoblock = 1 - }; - - for (int i = 0; i < dec->num_threads; i++) { - dec->worker_video_ctx[i] = tav_video_create(&video_params); - if (!dec->worker_video_ctx[i]) { - fprintf(stderr, "Error: Failed to create video context for thread %d\n", i); - return -1; - } - } - - // Initialize synchronization primitives - pthread_mutex_init(&dec->mutex, NULL); - pthread_cond_init(&dec->cond_job_available, NULL); - pthread_cond_init(&dec->cond_slot_free, NULL); - dec->threads_should_exit = 0; - dec->next_write_slot = 0; - dec->jobs_submitted = 0; - dec->jobs_completed = 0; - - // Create worker threads - dec->worker_threads = malloc(dec->num_threads * sizeof(pthread_t)); - if (!dec->worker_threads) { - fprintf(stderr, "Error: Failed to allocate worker threads\n"); - return -1; - } - - for (int i = 0; i < dec->num_threads; i++) { - if (pthread_create(&dec->worker_threads[i], NULL, decoder_worker_thread, dec) != 0) { - fprintf(stderr, "Error: Failed to create worker thread %d\n", i); - return -1; - } - } - - if (dec->verbose) { - printf("Initialized %d decoder worker threads with %d slots\n", - dec->num_threads, dec->num_slots); - } - - return 0; -} - -static void cleanup_decoder_threads(dt_decoder_t *dec) { - if (dec->num_threads <= 0) return; - - // Signal threads to exit - pthread_mutex_lock(&dec->mutex); - dec->threads_should_exit = 1; - pthread_cond_broadcast(&dec->cond_job_available); - pthread_mutex_unlock(&dec->mutex); - - // Wait for threads to finish - for (int i = 0; i < dec->num_threads; i++) { - pthread_join(dec->worker_threads[i], NULL); - } - free(dec->worker_threads); - dec->worker_threads = NULL; - - // Free per-thread video contexts - for (int i = 0; i < dec->num_threads; i++) { - tav_video_free(dec->worker_video_ctx[i]); - } - free(dec->worker_video_ctx); - dec->worker_video_ctx = NULL; - - // Free slots - for (int i = 0; i < dec->num_slots; i++) { - if (dec->slots[i].rgb_frames) { - for (int f = 0; f < dec->slots[i].frames_allocated; f++) { - free(dec->slots[i].rgb_frames[f]); - } - free(dec->slots[i].rgb_frames); - } - if (dec->slots[i].compressed_data) { - free(dec->slots[i].compressed_data); - } - } - free(dec->slots); - dec->slots = NULL; - - // Destroy sync primitives - pthread_mutex_destroy(&dec->mutex); - pthread_cond_destroy(&dec->cond_job_available); - pthread_cond_destroy(&dec->cond_slot_free); -} - // ============================================================================= // Main Decoding Loop // ============================================================================= @@ -1067,8 +1211,13 @@ static int process_packet(dt_decoder_t *dec) { // Process TAV subpacket (video comes after audio) if (header.offset_to_video < header.packet_size) { size_t tav_consumed = 0; - decode_video_subpacket(dec, packet_data + header.offset_to_video, - header.packet_size - header.offset_to_video, &tav_consumed); + if (dec->num_threads > 1) { + decode_video_subpacket_mt(dec, packet_data + header.offset_to_video, + header.packet_size - header.offset_to_video, &tav_consumed); + } else { + decode_video_subpacket(dec, packet_data + header.offset_to_video, + header.packet_size - header.offset_to_video, &tav_consumed); + } } dec->packets_processed++; @@ -1104,6 +1253,9 @@ static int run_decoder(dt_decoder_t *dec) { fprintf(stderr, "Warning: Cannot create temp video file, video will be skipped\n"); } + // Note: Multithreading will be initialized lazily after reading first packet header + // (need to know dimensions and quality settings first) + // Decode all packets if (dec->verbose) { printf("Decoding TAV-DT stream...\n"); @@ -1114,6 +1266,58 @@ static int run_decoder(dt_decoder_t *dec) { // Progress is shown in process_packet } + // Flush remaining GOPs in multithreaded mode + if (dec->num_threads > 1) { + pthread_mutex_lock(&dec->mutex); + + // Write all remaining completed GOPs in order + while (dec->next_write_slot < dec->jobs_submitted) { + int found = -1; + for (int i = 0; i < dec->num_slots; i++) { + if (dec->slots[i].status == DECODE_SLOT_DONE && + dec->slots[i].job_id == dec->next_write_slot) { + found = i; + break; + } + } + + if (found >= 0) { + gop_decode_job_t *job = &dec->slots[found]; + pthread_mutex_unlock(&dec->mutex); + + // Write frames + if (job->decode_result == 0 && dec->video_temp_fp) { + for (int f = 0; f < job->gop_size; f++) { + fwrite(job->rgb_frames[f], 1, job->frame_size, dec->video_temp_fp); + dec->frames_decoded++; + } + } + + pthread_mutex_lock(&dec->mutex); + + // Free resources while holding mutex + for (int f = 0; f < job->gop_size; f++) { + free(job->rgb_frames[f]); + } + free(job->rgb_frames); + free(job->compressed_data); + + job->status = DECODE_SLOT_EMPTY; + job->rgb_frames = NULL; + job->compressed_data = NULL; + dec->next_write_slot++; + } else { + // Wait for the GOP to complete + pthread_cond_wait(&dec->cond_slot_free, &dec->mutex); + } + } + + pthread_mutex_unlock(&dec->mutex); + + // Cleanup threads + cleanup_decoder_threads(dec); + } + // Close temp files for reading by FFmpeg if (dec->audio_temp_fp) { fclose(dec->audio_temp_fp);