tav: lean memory usage

This commit is contained in:
minjaesong
2025-11-30 13:41:41 +09:00
parent 902d971ae7
commit 3f8cf6a38c

View File

@@ -2031,8 +2031,8 @@ typedef struct gop_slot {
volatile int status; // GOP_STATUS_* values
int gop_index; // Sequential GOP number for ordering
// Input data (NEW: frame indices instead of pointers for complete isolation)
int frame_start_index; // Start index into global frame pool
// Input data (Circular buffering: each slot owns its frame buffers)
uint8_t **rgb_frames; // [slot_capacity][width*height*3] RGB frame buffers
int num_frames; // Number of frames in this GOP
int *frame_numbers; // Original frame indices (for timecodes)
float *pcm_samples; // [num_audio_samples*2] stereo L,R,L,R,...
@@ -2099,13 +2099,8 @@ typedef struct thread_pool {
int num_slots; // N = 2 * num_threads
int slot_capacity; // Max frames per GOP
// Global frame pool (complete frame isolation)
uint8_t **global_frame_pool; // [total_frames][width*height*3]
int total_frames; // Total frames in pool
size_t frame_rgb_size; // Bytes per frame (width*height*3)
// Producer state
int next_gop_to_fill; // Next GOP index to produce
// Producer state (circular buffering)
int next_slot_to_fill; // Next slot index to fill (circular)
int total_gops_produced; // Total produced so far
int total_frames_produced; // Total frames produced
int producer_finished; // 1 when no more frames, -1 on error
@@ -3005,8 +3000,29 @@ static gop_slot_t* init_gop_slots(int num_slots, int width, int height, int capa
mtx_init(&slot->mutex, mtx_plain);
cnd_init(&slot->status_changed);
// NO LONGER ALLOCATE rgb_frames - use global frame pool instead
slot->frame_start_index = -1; // Will be set by producer
// Allocate rgb_frames array (circular buffering: each slot owns its frames)
slot->rgb_frames = calloc(capacity, sizeof(uint8_t*));
if (!slot->rgb_frames) {
fprintf(stderr, "Error: Cannot allocate GOP slot %d rgb_frames array\n", i);
destroy_gop_slots(slots, i, capacity);
return NULL;
}
// Allocate each frame buffer
for (int j = 0; j < capacity; j++) {
slot->rgb_frames[j] = malloc(frame_rgb_size);
if (!slot->rgb_frames[j]) {
fprintf(stderr, "Error: Cannot allocate GOP slot %d frame buffer %d\n", i, j);
// Free already allocated frames in this slot
for (int k = 0; k < j; k++) {
free(slot->rgb_frames[k]);
}
free(slot->rgb_frames);
destroy_gop_slots(slots, i, capacity);
return NULL;
}
}
slot->num_frames = 0;
// Allocate frame numbers array
@@ -3033,9 +3049,9 @@ static gop_slot_t* init_gop_slots(int num_slots, int width, int height, int capa
slot->encoding_failed = 0;
}
// Memory calculation (excluding frame buffers which are now in global pool)
size_t total_memory = num_slots * (max_audio_samples * sizeof(float));
printf("Allocated %d GOP slots (%.1f MB total, excluding global frame pool)\n", num_slots, total_memory / (1024.0 * 1024.0));
// Memory calculation (including frame buffers for circular buffering)
size_t total_memory = num_slots * ((capacity * frame_rgb_size) + (max_audio_samples * sizeof(float)));
printf("Allocated %d GOP slots (%.1f MB total)\n", num_slots, total_memory / (1024.0 * 1024.0));
return slots;
}
@@ -3121,7 +3137,13 @@ static void destroy_gop_slots(gop_slot_t *slots, int num_slots, int capacity) {
for (int i = 0; i < num_slots; i++) {
gop_slot_t *slot = &slots[i];
// NO LONGER FREE rgb_frames (using global frame pool instead)
// Free rgb_frames (circular buffering: each slot owns its frames)
if (slot->rgb_frames) {
for (int j = 0; j < capacity; j++) {
free(slot->rgb_frames[j]);
}
free(slot->rgb_frames);
}
free(slot->frame_numbers);
free(slot->pcm_samples);
@@ -3188,10 +3210,7 @@ static thread_pool_t* create_thread_pool(tav_encoder_t *enc, int num_threads, in
pool->slot_capacity = TEMPORAL_GOP_SIZE;
pool->shared_enc = enc;
pool->shutdown = 0;
pool->producer_finished = 0;
pool->next_gop_to_fill = 0;
pool->total_gops_produced = 0;
pool->total_frames_produced = 0;
// Producer state already initialized earlier (lines 3232-3236)
pool->next_gop_to_write = 0;
pool->total_gops_written = 0;
@@ -3207,54 +3226,13 @@ static thread_pool_t* create_thread_pool(tav_encoder_t *enc, int num_threads, in
cnd_init(&pool->job_available);
cnd_init(&pool->slot_available);
// GLOBAL FRAME POOL: Calculate total frames needed from GOP boundaries
int total_frames = 0;
gop_boundary_t *boundary = enc->gop_boundaries;
while (boundary) {
int gop_frames = (boundary->end_frame - boundary->start_frame) + 1;
total_frames += gop_frames;
boundary = boundary->next;
}
// Initialize producer state for circular buffering
pool->next_slot_to_fill = 0;
pool->total_gops_produced = 0;
pool->total_frames_produced = 0;
pool->producer_finished = 0;
printf("Allocating global frame pool: %d frames\n", total_frames);
// Allocate global frame pool
pool->total_frames = total_frames;
pool->frame_rgb_size = enc->width * enc->height * 3;
pool->global_frame_pool = calloc(total_frames, sizeof(uint8_t*));
if (!pool->global_frame_pool) {
fprintf(stderr, "Error: Failed to allocate global frame pool array\n");
free(pool->job_queue);
mtx_destroy(&pool->job_queue_mutex);
cnd_destroy(&pool->job_available);
cnd_destroy(&pool->slot_available);
free(pool);
return NULL;
}
// Allocate each frame buffer
for (int i = 0; i < total_frames; i++) {
pool->global_frame_pool[i] = malloc(pool->frame_rgb_size);
if (!pool->global_frame_pool[i]) {
fprintf(stderr, "Error: Failed to allocate frame buffer %d/%d\n", i, total_frames);
// Free already allocated frames
for (int j = 0; j < i; j++) {
free(pool->global_frame_pool[j]);
}
free(pool->global_frame_pool);
free(pool->job_queue);
mtx_destroy(&pool->job_queue_mutex);
cnd_destroy(&pool->job_available);
cnd_destroy(&pool->slot_available);
free(pool);
return NULL;
}
}
printf("Global frame pool allocated: %zu MB\n",
(total_frames * pool->frame_rgb_size) / (1024*1024));
// Allocate GOP slots
// Allocate GOP slots (each slot owns its own frame buffers)
pool->slots = init_gop_slots(pool->num_slots, enc->width, enc->height, pool->slot_capacity);
if (!pool->slots) {
fprintf(stderr, "Error: Failed to allocate GOP slots\n");
@@ -3398,17 +3376,9 @@ static void shutdown_thread_pool(thread_pool_t *pool) {
pool->writer_thread_joined = 1;
}
// Destroy slots
// Destroy slots (each slot owns its own frame buffers)
destroy_gop_slots(pool->slots, pool->num_slots, pool->slot_capacity);
// Free global frame pool
if (pool->global_frame_pool) {
for (int i = 0; i < pool->total_frames; i++) {
free(pool->global_frame_pool[i]);
}
free(pool->global_frame_pool);
}
// Free job queue
free(pool->job_queue);
@@ -3488,9 +3458,9 @@ static int worker_thread_main(void *arg) {
printf("worker_thread slot_idx=%d, num_frames=%d\n", slot_idx, num_frames);
// Step 1: Convert RGB to YCoCg-R (or ICtCp)
// Access frames from global pool via slot's frame range
// Access frames from slot's own frame buffers (circular buffering)
for (int i = 0; i < num_frames; i++) {
uint8_t *rgb_frame = pool->global_frame_pool[slot->frame_start_index + i];
uint8_t *rgb_frame = slot->rgb_frames[i];
rgb_to_colour_space_frame(enc, rgb_frame,
ctx->work_y_frames[i], ctx->work_co_frames[i], ctx->work_cg_frames[i],
@@ -3708,35 +3678,61 @@ static int worker_thread_main(void *arg) {
// Multi-Threading - Producer Thread
// =============================================================================
/**
* Producer thread: Read frames from FFmpeg and fill GOP slots
*/
/**
* Producer thread: Read frames from FFmpeg and fill GOP slots according to GOP boundaries
* UPFRONT ALLOCATION VERSION: Uses slot index = GOP index, no circular buffer reuse
* CIRCULAR BUFFERING VERSION: Waits for empty slots, reads frames on-demand, slots reused
*/
static int producer_thread_main(void *arg) {
thread_pool_t *pool = (thread_pool_t*)arg;
tav_encoder_t *enc = pool->shared_enc;
printf("Producer thread starting (global frame pool mode)\n");
printf(" Total GOP slots: %d\n", pool->num_slots);
printf(" Total frames to read: %d\n", pool->total_frames);
printf("Producer thread starting (circular buffering mode)\n");
printf(" GOP buffer slots: %d\n", pool->num_slots);
// STEP 1: Read ALL frames sequentially into global pool
printf("Reading all frames into global pool...\n");
for (int i = 0; i < pool->total_frames; i++) {
size_t bytes_read = fread(pool->global_frame_pool[i], 1,
pool->frame_rgb_size, enc->ffmpeg_video_pipe);
gop_boundary_t *current_gop_boundary = enc->gop_boundaries;
int global_frame_number = 0;
if (bytes_read != pool->frame_rgb_size) {
if (feof(enc->ffmpeg_video_pipe)) {
fprintf(stderr, "WARNING: EOF at frame %d (expected %d frames)\n",
i, pool->total_frames);
pool->total_frames = i; // Adjust total
break;
} else {
fprintf(stderr, "Error: FFmpeg pipe read failed at frame %d\n", i);
while (current_gop_boundary) {
// 1. Wait for an empty slot (circular buffering)
int slot_idx;
gop_slot_t *slot = get_empty_slot(pool, &slot_idx);
if (!slot) {
// Shutdown signal received
break;
}
// 2. Calculate frames for this GOP
int expected_frames = current_gop_boundary->num_frames;
if (expected_frames <= 0) {
expected_frames = (current_gop_boundary->end_frame - current_gop_boundary->start_frame) + 1;
}
// Verify slot has enough capacity
if (expected_frames > pool->slot_capacity) {
fprintf(stderr, "Error: GOP requires %d frames but slot capacity is %d\n",
expected_frames, pool->slot_capacity);
mtx_lock(&pool->job_queue_mutex);
pool->producer_finished = -1;
pool->shutdown = 1;
cnd_broadcast(&pool->job_available);
mtx_unlock(&pool->job_queue_mutex);
return -1;
}
// 3. Read frames directly into slot->rgb_frames
int frames_read = 0;
size_t frame_rgb_size = enc->width * enc->height * 3;
for (int i = 0; i < expected_frames; i++) {
size_t bytes_read = fread(slot->rgb_frames[i], 1, frame_rgb_size, enc->ffmpeg_video_pipe);
if (bytes_read != frame_rgb_size) {
if (feof(enc->ffmpeg_video_pipe)) {
fprintf(stderr, "WARNING: EOF at frame %d\n", global_frame_number + i);
break;
}
// Handle error
fprintf(stderr, "Error: FFmpeg pipe read failed at frame %d\n", global_frame_number + i);
mtx_lock(&pool->job_queue_mutex);
pool->producer_finished = -1;
pool->shutdown = 1;
@@ -3744,89 +3740,55 @@ static int producer_thread_main(void *arg) {
mtx_unlock(&pool->job_queue_mutex);
return -1;
}
slot->frame_numbers[i] = current_gop_boundary->start_frame + i;
frames_read++;
}
if ((i + 1) % 500 == 0) {
printf(" Read %d frames...\n", i + 1);
}
}
printf("All frames read into global pool: %d frames\n", pool->total_frames);
// STEP 2: Assign frame ranges to GOP slots
int global_frame_idx = 0;
int gop_index = 0;
gop_boundary_t *current_gop_boundary = enc->gop_boundaries;
while (current_gop_boundary && gop_index < pool->num_slots) {
gop_slot_t *slot = &pool->slots[gop_index];
int expected_frames = current_gop_boundary->num_frames;
if (expected_frames <= 0) {
expected_frames = (current_gop_boundary->end_frame - current_gop_boundary->start_frame) + 1;
// If we hit EOF mid-GOP, still process what we read
if (frames_read == 0) {
// Mark slot as empty again and exit
mtx_lock(&slot->mutex);
slot->status = GOP_STATUS_EMPTY;
mtx_unlock(&slot->mutex);
cnd_broadcast(&pool->slot_available);
break;
}
// Check we haven't exceeded total frames
if (global_frame_idx + expected_frames > pool->total_frames) {
fprintf(stderr, "WARNING: GOP %d would exceed total frames (%d + %d > %d)\n",
gop_index, global_frame_idx, expected_frames, pool->total_frames);
expected_frames = pool->total_frames - global_frame_idx;
if (expected_frames <= 0) break;
// 4. Read audio for this GOP (if applicable)
if (enc->pcm_file && (enc->tad_audio || enc->pcm8_audio)) {
size_t total_audio_samples = frames_read * enc->samples_per_frame;
size_t audio_bytes = total_audio_samples * 2 * sizeof(float);
size_t audio_read = fread(slot->pcm_samples, 1, audio_bytes, enc->pcm_file);
slot->num_audio_samples = audio_read / (2 * sizeof(float));
}
// Initialize slot with frame range
// 5. Initialize slot metadata
mtx_lock(&slot->mutex);
slot->gop_index = gop_index;
slot->frame_start_index = global_frame_idx; // Start index in global pool
slot->num_frames = expected_frames;
slot->gop_index = pool->total_gops_produced;
slot->num_frames = frames_read;
slot->width = enc->width;
slot->height = enc->height;
slot->status = GOP_STATUS_FILLING;
// Set frame numbers for timecodes
for (int i = 0; i < expected_frames; i++) {
slot->frame_numbers[i] = current_gop_boundary->start_frame + i;
}
mtx_unlock(&slot->mutex);
// Read audio for this GOP
if (enc->pcm_file && (enc->tad_audio || enc->pcm8_audio)) {
size_t total_audio_samples = expected_frames * enc->samples_per_frame;
size_t audio_bytes = total_audio_samples * 2 * sizeof(float);
size_t audio_read = fread(slot->pcm_samples, 1, audio_bytes, enc->pcm_file);
if (audio_read == audio_bytes) {
slot->num_audio_samples = total_audio_samples;
} else {
fprintf(stderr, "Warning: Incomplete audio read for GOP %d\n", gop_index);
slot->num_audio_samples = audio_read / (2 * sizeof(float));
}
}
// Enqueue GOP for workers
mtx_lock(&pool->job_queue_mutex);
pool->job_queue[pool->job_queue_tail] = gop_index;
pool->job_queue_tail = (pool->job_queue_tail + 1) % pool->job_queue_capacity;
pool->job_queue_size++;
pool->total_gops_produced++;
pool->total_frames_produced += expected_frames;
// Mark slot as ready
mtx_lock(&slot->mutex);
slot->status = GOP_STATUS_READY;
mtx_unlock(&slot->mutex);
// 6. Enqueue GOP for workers
mtx_lock(&pool->job_queue_mutex);
pool->job_queue[pool->job_queue_tail] = slot_idx;
pool->job_queue_tail = (pool->job_queue_tail + 1) % pool->job_queue_capacity;
pool->job_queue_size++;
pool->total_gops_produced++;
pool->total_frames_produced += frames_read;
cnd_broadcast(&pool->job_available);
mtx_unlock(&pool->job_queue_mutex);
if ((gop_index + 1) % 50 == 0) {
printf("Producer: Assigned %d GOPs\n", gop_index + 1);
if (pool->total_gops_produced % 50 == 0) {
printf("Producer: %d GOPs queued\n", pool->total_gops_produced);
}
// Move to next GOP
global_frame_idx += expected_frames;
// 7. Move to next GOP
global_frame_number += frames_read;
current_gop_boundary = current_gop_boundary->next;
gop_index++;
}
// Signal completion
@@ -3836,7 +3798,7 @@ static int producer_thread_main(void *arg) {
mtx_unlock(&pool->job_queue_mutex);
printf("Producer thread complete: %d frames read, %d GOPs assigned\n",
pool->total_frames, pool->total_gops_produced);
pool->total_frames_produced, pool->total_gops_produced);
return 0;
}
@@ -3954,12 +3916,13 @@ static int writer_thread_main(void *arg) {
mtx_unlock(&slot->mutex);
// UPFRONT ALLOCATION: Do NOT free slots - each slot is used exactly once for its GOP
// free_gop_slot(slot); // REMOVED: breaks upfront allocation model
// CIRCULAR BUFFERING: Free slot and signal producer
free_gop_slot(slot);
mtx_lock(&pool->job_queue_mutex);
pool->total_gops_written++;
int gops_written = pool->total_gops_written;
// cnd_signal(&pool->slot_available); // REMOVED: not needed with upfront allocation
cnd_broadcast(&pool->slot_available); // Wake up producer waiting for empty slot
mtx_unlock(&pool->job_queue_mutex);
// Progress reporting (using saved values)