tav: first working multithreaded encoder

This commit is contained in:
minjaesong
2025-11-30 13:24:24 +09:00
parent 5ecf2dcadd
commit 902d971ae7

View File

@@ -20,7 +20,7 @@
#include <threads.h> // C11 threads for multi-threading #include <threads.h> // C11 threads for multi-threading
#include "tav_avx512.h" // AVX-512 SIMD optimisations #include "tav_avx512.h" // AVX-512 SIMD optimisations
#define ENCODER_VENDOR_STRING "Encoder-TAV 20251128 (3d-dwt,tad,ssf-tc,cdf53-motion,avx512,presets)" #define ENCODER_VENDOR_STRING "Encoder-TAV 20251130 (3d-dwt,tad,ssf-tc,cdf53-motion,avx512,presets,mt)"
// TSVM Advanced Video (TAV) format constants // TSVM Advanced Video (TAV) format constants
#define TAV_MAGIC "\x1F\x54\x53\x56\x4D\x54\x41\x56" // "\x1FTSVM TAV" #define TAV_MAGIC "\x1F\x54\x53\x56\x4D\x54\x41\x56" // "\x1FTSVM TAV"
@@ -2088,6 +2088,12 @@ typedef struct thread_pool {
thrd_t producer_thread; // Producer thread handle thrd_t producer_thread; // Producer thread handle
thrd_t writer_thread; // Writer thread handle thrd_t writer_thread; // Writer thread handle
// Thread lifecycle tracking (prevent double-join)
int producer_thread_created; // 1 if producer thread was created
int writer_thread_created; // 1 if writer thread was created
int producer_thread_joined; // 1 if producer thread was joined
int writer_thread_joined; // 1 if writer thread was joined
// Circular buffer of GOP slots // Circular buffer of GOP slots
gop_slot_t *slots; // Array of slots gop_slot_t *slots; // Array of slots
int num_slots; // N = 2 * num_threads int num_slots; // N = 2 * num_threads
@@ -3374,9 +3380,10 @@ static void shutdown_thread_pool(thread_pool_t *pool) {
cnd_broadcast(&pool->slot_available); cnd_broadcast(&pool->slot_available);
mtx_unlock(&pool->job_queue_mutex); mtx_unlock(&pool->job_queue_mutex);
// Wait for producer thread // Wait for producer thread (only if created and not already joined)
if (pool->producer_finished != 0) { // Only join if started if (pool->producer_thread_created && !pool->producer_thread_joined) {
thrd_join(pool->producer_thread, NULL); thrd_join(pool->producer_thread, NULL);
pool->producer_thread_joined = 1;
} }
// Wait for all worker threads // Wait for all worker threads
@@ -3385,9 +3392,10 @@ static void shutdown_thread_pool(thread_pool_t *pool) {
thrd_join(pool->worker_threads[i], &result); thrd_join(pool->worker_threads[i], &result);
} }
// Wait for writer thread // Wait for writer thread (only if created and not already joined)
if (pool->total_gops_written > 0 || pool->total_gops_produced > 0) { // Only join if started if (pool->writer_thread_created && !pool->writer_thread_joined) {
thrd_join(pool->writer_thread, NULL); thrd_join(pool->writer_thread, NULL);
pool->writer_thread_joined = 1;
} }
// Destroy slots // Destroy slots
@@ -12595,6 +12603,7 @@ int main(int argc, char *argv[]) {
cleanup_encoder(enc); cleanup_encoder(enc);
return 1; return 1;
} }
enc->thread_pool->producer_thread_created = 1;
// Start writer thread // Start writer thread
if (thrd_create(&enc->thread_pool->writer_thread, writer_thread_main, if (thrd_create(&enc->thread_pool->writer_thread, writer_thread_main,
@@ -12603,16 +12612,20 @@ int main(int argc, char *argv[]) {
mtx_lock(&enc->thread_pool->job_queue_mutex); mtx_lock(&enc->thread_pool->job_queue_mutex);
enc->thread_pool->shutdown = 1; enc->thread_pool->shutdown = 1;
mtx_unlock(&enc->thread_pool->job_queue_mutex); mtx_unlock(&enc->thread_pool->job_queue_mutex);
// Join producer thread manually and mark as joined
thrd_join(enc->thread_pool->producer_thread, NULL); thrd_join(enc->thread_pool->producer_thread, NULL);
enc->thread_pool->producer_thread_joined = 1;
shutdown_thread_pool(enc->thread_pool); shutdown_thread_pool(enc->thread_pool);
enc->thread_pool = NULL; enc->thread_pool = NULL;
cleanup_encoder(enc); cleanup_encoder(enc);
return 1; return 1;
} }
enc->thread_pool->writer_thread_created = 1;
// Wait for writer to complete (it waits for producer and workers) // Wait for writer to complete (it waits for producer and workers)
int writer_result; int writer_result;
thrd_join(enc->thread_pool->writer_thread, &writer_result); thrd_join(enc->thread_pool->writer_thread, &writer_result);
enc->thread_pool->writer_thread_joined = 1;
if (writer_result != 0) { if (writer_result != 0) {
fprintf(stderr, "Error: Writer thread failed\n"); fprintf(stderr, "Error: Writer thread failed\n");