From 3f8cf6a38c32b71a94a34773aeb96fdd73d002e6 Mon Sep 17 00:00:00 2001 From: minjaesong Date: Sun, 30 Nov 2025 13:41:41 +0900 Subject: [PATCH] tav: lean memory usage --- video_encoder/encoder_tav.c | 295 ++++++++++++++++-------------------- 1 file changed, 129 insertions(+), 166 deletions(-) diff --git a/video_encoder/encoder_tav.c b/video_encoder/encoder_tav.c index 606a407..bdf9217 100644 --- a/video_encoder/encoder_tav.c +++ b/video_encoder/encoder_tav.c @@ -2031,8 +2031,8 @@ typedef struct gop_slot { volatile int status; // GOP_STATUS_* values int gop_index; // Sequential GOP number for ordering - // Input data (NEW: frame indices instead of pointers for complete isolation) - int frame_start_index; // Start index into global frame pool + // Input data (Circular buffering: each slot owns its frame buffers) + uint8_t **rgb_frames; // [slot_capacity][width*height*3] RGB frame buffers int num_frames; // Number of frames in this GOP int *frame_numbers; // Original frame indices (for timecodes) float *pcm_samples; // [num_audio_samples*2] stereo L,R,L,R,... @@ -2099,13 +2099,8 @@ typedef struct thread_pool { int num_slots; // N = 2 * num_threads int slot_capacity; // Max frames per GOP - // Global frame pool (complete frame isolation) - uint8_t **global_frame_pool; // [total_frames][width*height*3] - int total_frames; // Total frames in pool - size_t frame_rgb_size; // Bytes per frame (width*height*3) - - // Producer state - int next_gop_to_fill; // Next GOP index to produce + // Producer state (circular buffering) + int next_slot_to_fill; // Next slot index to fill (circular) int total_gops_produced; // Total produced so far int total_frames_produced; // Total frames produced int producer_finished; // 1 when no more frames, -1 on error @@ -3005,8 +3000,29 @@ static gop_slot_t* init_gop_slots(int num_slots, int width, int height, int capa mtx_init(&slot->mutex, mtx_plain); cnd_init(&slot->status_changed); - // NO LONGER ALLOCATE rgb_frames - use global frame pool instead - slot->frame_start_index = -1; // Will be set by producer + // Allocate rgb_frames array (circular buffering: each slot owns its frames) + slot->rgb_frames = calloc(capacity, sizeof(uint8_t*)); + if (!slot->rgb_frames) { + fprintf(stderr, "Error: Cannot allocate GOP slot %d rgb_frames array\n", i); + destroy_gop_slots(slots, i, capacity); + return NULL; + } + + // Allocate each frame buffer + for (int j = 0; j < capacity; j++) { + slot->rgb_frames[j] = malloc(frame_rgb_size); + if (!slot->rgb_frames[j]) { + fprintf(stderr, "Error: Cannot allocate GOP slot %d frame buffer %d\n", i, j); + // Free already allocated frames in this slot + for (int k = 0; k < j; k++) { + free(slot->rgb_frames[k]); + } + free(slot->rgb_frames); + destroy_gop_slots(slots, i, capacity); + return NULL; + } + } + slot->num_frames = 0; // Allocate frame numbers array @@ -3033,9 +3049,9 @@ static gop_slot_t* init_gop_slots(int num_slots, int width, int height, int capa slot->encoding_failed = 0; } - // Memory calculation (excluding frame buffers which are now in global pool) - size_t total_memory = num_slots * (max_audio_samples * sizeof(float)); - printf("Allocated %d GOP slots (%.1f MB total, excluding global frame pool)\n", num_slots, total_memory / (1024.0 * 1024.0)); + // Memory calculation (including frame buffers for circular buffering) + size_t total_memory = num_slots * ((capacity * frame_rgb_size) + (max_audio_samples * sizeof(float))); + printf("Allocated %d GOP slots (%.1f MB total)\n", num_slots, total_memory / (1024.0 * 1024.0)); return slots; } @@ -3121,7 +3137,13 @@ static void destroy_gop_slots(gop_slot_t *slots, int num_slots, int capacity) { for (int i = 0; i < num_slots; i++) { gop_slot_t *slot = &slots[i]; - // NO LONGER FREE rgb_frames (using global frame pool instead) + // Free rgb_frames (circular buffering: each slot owns its frames) + if (slot->rgb_frames) { + for (int j = 0; j < capacity; j++) { + free(slot->rgb_frames[j]); + } + free(slot->rgb_frames); + } free(slot->frame_numbers); free(slot->pcm_samples); @@ -3188,10 +3210,7 @@ static thread_pool_t* create_thread_pool(tav_encoder_t *enc, int num_threads, in pool->slot_capacity = TEMPORAL_GOP_SIZE; pool->shared_enc = enc; pool->shutdown = 0; - pool->producer_finished = 0; - pool->next_gop_to_fill = 0; - pool->total_gops_produced = 0; - pool->total_frames_produced = 0; + // Producer state already initialized earlier (lines 3232-3236) pool->next_gop_to_write = 0; pool->total_gops_written = 0; @@ -3207,54 +3226,13 @@ static thread_pool_t* create_thread_pool(tav_encoder_t *enc, int num_threads, in cnd_init(&pool->job_available); cnd_init(&pool->slot_available); - // GLOBAL FRAME POOL: Calculate total frames needed from GOP boundaries - int total_frames = 0; - gop_boundary_t *boundary = enc->gop_boundaries; - while (boundary) { - int gop_frames = (boundary->end_frame - boundary->start_frame) + 1; - total_frames += gop_frames; - boundary = boundary->next; - } + // Initialize producer state for circular buffering + pool->next_slot_to_fill = 0; + pool->total_gops_produced = 0; + pool->total_frames_produced = 0; + pool->producer_finished = 0; - printf("Allocating global frame pool: %d frames\n", total_frames); - - // Allocate global frame pool - pool->total_frames = total_frames; - pool->frame_rgb_size = enc->width * enc->height * 3; - pool->global_frame_pool = calloc(total_frames, sizeof(uint8_t*)); - if (!pool->global_frame_pool) { - fprintf(stderr, "Error: Failed to allocate global frame pool array\n"); - free(pool->job_queue); - mtx_destroy(&pool->job_queue_mutex); - cnd_destroy(&pool->job_available); - cnd_destroy(&pool->slot_available); - free(pool); - return NULL; - } - - // Allocate each frame buffer - for (int i = 0; i < total_frames; i++) { - pool->global_frame_pool[i] = malloc(pool->frame_rgb_size); - if (!pool->global_frame_pool[i]) { - fprintf(stderr, "Error: Failed to allocate frame buffer %d/%d\n", i, total_frames); - // Free already allocated frames - for (int j = 0; j < i; j++) { - free(pool->global_frame_pool[j]); - } - free(pool->global_frame_pool); - free(pool->job_queue); - mtx_destroy(&pool->job_queue_mutex); - cnd_destroy(&pool->job_available); - cnd_destroy(&pool->slot_available); - free(pool); - return NULL; - } - } - - printf("Global frame pool allocated: %zu MB\n", - (total_frames * pool->frame_rgb_size) / (1024*1024)); - - // Allocate GOP slots + // Allocate GOP slots (each slot owns its own frame buffers) pool->slots = init_gop_slots(pool->num_slots, enc->width, enc->height, pool->slot_capacity); if (!pool->slots) { fprintf(stderr, "Error: Failed to allocate GOP slots\n"); @@ -3398,17 +3376,9 @@ static void shutdown_thread_pool(thread_pool_t *pool) { pool->writer_thread_joined = 1; } - // Destroy slots + // Destroy slots (each slot owns its own frame buffers) destroy_gop_slots(pool->slots, pool->num_slots, pool->slot_capacity); - // Free global frame pool - if (pool->global_frame_pool) { - for (int i = 0; i < pool->total_frames; i++) { - free(pool->global_frame_pool[i]); - } - free(pool->global_frame_pool); - } - // Free job queue free(pool->job_queue); @@ -3488,9 +3458,9 @@ static int worker_thread_main(void *arg) { printf("worker_thread slot_idx=%d, num_frames=%d\n", slot_idx, num_frames); // Step 1: Convert RGB to YCoCg-R (or ICtCp) - // Access frames from global pool via slot's frame range + // Access frames from slot's own frame buffers (circular buffering) for (int i = 0; i < num_frames; i++) { - uint8_t *rgb_frame = pool->global_frame_pool[slot->frame_start_index + i]; + uint8_t *rgb_frame = slot->rgb_frames[i]; rgb_to_colour_space_frame(enc, rgb_frame, ctx->work_y_frames[i], ctx->work_co_frames[i], ctx->work_cg_frames[i], @@ -3708,35 +3678,61 @@ static int worker_thread_main(void *arg) { // Multi-Threading - Producer Thread // ============================================================================= -/** - * Producer thread: Read frames from FFmpeg and fill GOP slots - */ /** * Producer thread: Read frames from FFmpeg and fill GOP slots according to GOP boundaries - * UPFRONT ALLOCATION VERSION: Uses slot index = GOP index, no circular buffer reuse + * CIRCULAR BUFFERING VERSION: Waits for empty slots, reads frames on-demand, slots reused */ static int producer_thread_main(void *arg) { thread_pool_t *pool = (thread_pool_t*)arg; tav_encoder_t *enc = pool->shared_enc; - printf("Producer thread starting (global frame pool mode)\n"); - printf(" Total GOP slots: %d\n", pool->num_slots); - printf(" Total frames to read: %d\n", pool->total_frames); + printf("Producer thread starting (circular buffering mode)\n"); + printf(" GOP buffer slots: %d\n", pool->num_slots); - // STEP 1: Read ALL frames sequentially into global pool - printf("Reading all frames into global pool...\n"); - for (int i = 0; i < pool->total_frames; i++) { - size_t bytes_read = fread(pool->global_frame_pool[i], 1, - pool->frame_rgb_size, enc->ffmpeg_video_pipe); + gop_boundary_t *current_gop_boundary = enc->gop_boundaries; + int global_frame_number = 0; - if (bytes_read != pool->frame_rgb_size) { - if (feof(enc->ffmpeg_video_pipe)) { - fprintf(stderr, "WARNING: EOF at frame %d (expected %d frames)\n", - i, pool->total_frames); - pool->total_frames = i; // Adjust total - break; - } else { - fprintf(stderr, "Error: FFmpeg pipe read failed at frame %d\n", i); + while (current_gop_boundary) { + // 1. Wait for an empty slot (circular buffering) + int slot_idx; + gop_slot_t *slot = get_empty_slot(pool, &slot_idx); + if (!slot) { + // Shutdown signal received + break; + } + + // 2. Calculate frames for this GOP + int expected_frames = current_gop_boundary->num_frames; + if (expected_frames <= 0) { + expected_frames = (current_gop_boundary->end_frame - current_gop_boundary->start_frame) + 1; + } + + // Verify slot has enough capacity + if (expected_frames > pool->slot_capacity) { + fprintf(stderr, "Error: GOP requires %d frames but slot capacity is %d\n", + expected_frames, pool->slot_capacity); + mtx_lock(&pool->job_queue_mutex); + pool->producer_finished = -1; + pool->shutdown = 1; + cnd_broadcast(&pool->job_available); + mtx_unlock(&pool->job_queue_mutex); + return -1; + } + + // 3. Read frames directly into slot->rgb_frames + int frames_read = 0; + size_t frame_rgb_size = enc->width * enc->height * 3; + + for (int i = 0; i < expected_frames; i++) { + size_t bytes_read = fread(slot->rgb_frames[i], 1, frame_rgb_size, enc->ffmpeg_video_pipe); + + if (bytes_read != frame_rgb_size) { + if (feof(enc->ffmpeg_video_pipe)) { + fprintf(stderr, "WARNING: EOF at frame %d\n", global_frame_number + i); + break; + } + // Handle error + fprintf(stderr, "Error: FFmpeg pipe read failed at frame %d\n", global_frame_number + i); mtx_lock(&pool->job_queue_mutex); pool->producer_finished = -1; pool->shutdown = 1; @@ -3744,89 +3740,55 @@ static int producer_thread_main(void *arg) { mtx_unlock(&pool->job_queue_mutex); return -1; } + + slot->frame_numbers[i] = current_gop_boundary->start_frame + i; + frames_read++; } - if ((i + 1) % 500 == 0) { - printf(" Read %d frames...\n", i + 1); - } - } - - printf("All frames read into global pool: %d frames\n", pool->total_frames); - - // STEP 2: Assign frame ranges to GOP slots - int global_frame_idx = 0; - int gop_index = 0; - gop_boundary_t *current_gop_boundary = enc->gop_boundaries; - - while (current_gop_boundary && gop_index < pool->num_slots) { - gop_slot_t *slot = &pool->slots[gop_index]; - - int expected_frames = current_gop_boundary->num_frames; - if (expected_frames <= 0) { - expected_frames = (current_gop_boundary->end_frame - current_gop_boundary->start_frame) + 1; + // If we hit EOF mid-GOP, still process what we read + if (frames_read == 0) { + // Mark slot as empty again and exit + mtx_lock(&slot->mutex); + slot->status = GOP_STATUS_EMPTY; + mtx_unlock(&slot->mutex); + cnd_broadcast(&pool->slot_available); + break; } - // Check we haven't exceeded total frames - if (global_frame_idx + expected_frames > pool->total_frames) { - fprintf(stderr, "WARNING: GOP %d would exceed total frames (%d + %d > %d)\n", - gop_index, global_frame_idx, expected_frames, pool->total_frames); - expected_frames = pool->total_frames - global_frame_idx; - if (expected_frames <= 0) break; + // 4. Read audio for this GOP (if applicable) + if (enc->pcm_file && (enc->tad_audio || enc->pcm8_audio)) { + size_t total_audio_samples = frames_read * enc->samples_per_frame; + size_t audio_bytes = total_audio_samples * 2 * sizeof(float); + size_t audio_read = fread(slot->pcm_samples, 1, audio_bytes, enc->pcm_file); + slot->num_audio_samples = audio_read / (2 * sizeof(float)); } - // Initialize slot with frame range + // 5. Initialize slot metadata mtx_lock(&slot->mutex); - slot->gop_index = gop_index; - slot->frame_start_index = global_frame_idx; // Start index in global pool - slot->num_frames = expected_frames; + slot->gop_index = pool->total_gops_produced; + slot->num_frames = frames_read; slot->width = enc->width; slot->height = enc->height; - slot->status = GOP_STATUS_FILLING; - - // Set frame numbers for timecodes - for (int i = 0; i < expected_frames; i++) { - slot->frame_numbers[i] = current_gop_boundary->start_frame + i; - } - mtx_unlock(&slot->mutex); - - // Read audio for this GOP - if (enc->pcm_file && (enc->tad_audio || enc->pcm8_audio)) { - size_t total_audio_samples = expected_frames * enc->samples_per_frame; - size_t audio_bytes = total_audio_samples * 2 * sizeof(float); - - size_t audio_read = fread(slot->pcm_samples, 1, audio_bytes, enc->pcm_file); - if (audio_read == audio_bytes) { - slot->num_audio_samples = total_audio_samples; - } else { - fprintf(stderr, "Warning: Incomplete audio read for GOP %d\n", gop_index); - slot->num_audio_samples = audio_read / (2 * sizeof(float)); - } - } - - // Enqueue GOP for workers - mtx_lock(&pool->job_queue_mutex); - pool->job_queue[pool->job_queue_tail] = gop_index; - pool->job_queue_tail = (pool->job_queue_tail + 1) % pool->job_queue_capacity; - pool->job_queue_size++; - pool->total_gops_produced++; - pool->total_frames_produced += expected_frames; - - // Mark slot as ready - mtx_lock(&slot->mutex); slot->status = GOP_STATUS_READY; mtx_unlock(&slot->mutex); + // 6. Enqueue GOP for workers + mtx_lock(&pool->job_queue_mutex); + pool->job_queue[pool->job_queue_tail] = slot_idx; + pool->job_queue_tail = (pool->job_queue_tail + 1) % pool->job_queue_capacity; + pool->job_queue_size++; + pool->total_gops_produced++; + pool->total_frames_produced += frames_read; cnd_broadcast(&pool->job_available); mtx_unlock(&pool->job_queue_mutex); - if ((gop_index + 1) % 50 == 0) { - printf("Producer: Assigned %d GOPs\n", gop_index + 1); + if (pool->total_gops_produced % 50 == 0) { + printf("Producer: %d GOPs queued\n", pool->total_gops_produced); } - // Move to next GOP - global_frame_idx += expected_frames; + // 7. Move to next GOP + global_frame_number += frames_read; current_gop_boundary = current_gop_boundary->next; - gop_index++; } // Signal completion @@ -3836,7 +3798,7 @@ static int producer_thread_main(void *arg) { mtx_unlock(&pool->job_queue_mutex); printf("Producer thread complete: %d frames read, %d GOPs assigned\n", - pool->total_frames, pool->total_gops_produced); + pool->total_frames_produced, pool->total_gops_produced); return 0; } @@ -3954,12 +3916,13 @@ static int writer_thread_main(void *arg) { mtx_unlock(&slot->mutex); - // UPFRONT ALLOCATION: Do NOT free slots - each slot is used exactly once for its GOP - // free_gop_slot(slot); // REMOVED: breaks upfront allocation model + // CIRCULAR BUFFERING: Free slot and signal producer + free_gop_slot(slot); + mtx_lock(&pool->job_queue_mutex); pool->total_gops_written++; int gops_written = pool->total_gops_written; - // cnd_signal(&pool->slot_available); // REMOVED: not needed with upfront allocation + cnd_broadcast(&pool->slot_available); // Wake up producer waiting for empty slot mtx_unlock(&pool->job_queue_mutex); // Progress reporting (using saved values)