mirror of
https://github.com/curioustorvald/tsvm.git
synced 2026-03-09 20:51:51 +09:00
TAV-DT multithreaded encoding
This commit is contained in:
@@ -30,6 +30,7 @@
|
||||
#include <sys/wait.h>
|
||||
#include <signal.h>
|
||||
#include <time.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include "tav_video_decoder.h"
|
||||
#include "decoder_tad.h"
|
||||
@@ -64,6 +65,53 @@ static const int QUALITY_CO[] = {123, 108, 91, 76, 59, 29};
|
||||
static const int QUALITY_CG[] = {148, 133, 113, 99, 76, 39};
|
||||
|
||||
#define MAX_PATH 4096
|
||||
#define MAX_DECODE_THREADS 16
|
||||
|
||||
// =============================================================================
|
||||
// Multithreading Structures
|
||||
// =============================================================================
|
||||
|
||||
#define DECODE_SLOT_EMPTY 0
|
||||
#define DECODE_SLOT_PENDING 1
|
||||
#define DECODE_SLOT_DONE 2
|
||||
|
||||
// GOP decode job structure
|
||||
typedef struct {
|
||||
// Input
|
||||
uint8_t *compressed_data; // Raw GOP data to decode
|
||||
size_t compressed_size;
|
||||
int gop_size; // Number of frames in this GOP
|
||||
int job_id; // Sequential job ID for ordering output
|
||||
|
||||
// Output
|
||||
uint8_t **rgb_frames; // Decoded RGB24 frames [gop_size]
|
||||
int frames_allocated; // How many frames are allocated
|
||||
int decode_result; // 0 = success, -1 = error
|
||||
|
||||
// Status
|
||||
volatile int status;
|
||||
} gop_decode_job_t;
|
||||
|
||||
/**
|
||||
* Get number of available CPUs.
|
||||
*/
|
||||
static int get_available_cpus(void) {
|
||||
#ifdef _SC_NPROCESSORS_ONLN
|
||||
long nproc = sysconf(_SC_NPROCESSORS_ONLN);
|
||||
if (nproc > 0) {
|
||||
return (int)nproc;
|
||||
}
|
||||
#endif
|
||||
return 1; // Fallback to single core
|
||||
}
|
||||
|
||||
/**
|
||||
* Get default thread count (cap at 8)
|
||||
*/
|
||||
static int get_default_thread_count(void) {
|
||||
int available = get_available_cpus();
|
||||
return available < 8 ? available : 8;
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// CRC-32
|
||||
@@ -138,6 +186,23 @@ typedef struct {
|
||||
// Options
|
||||
int verbose;
|
||||
int dump_mode; // Just dump packets, don't decode
|
||||
|
||||
// Multithreading
|
||||
int num_threads;
|
||||
int num_slots;
|
||||
gop_decode_job_t *slots;
|
||||
tav_video_context_t **worker_video_ctx; // Per-thread decoder contexts
|
||||
pthread_t *worker_threads;
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t cond_job_available;
|
||||
pthread_cond_t cond_slot_free;
|
||||
volatile int threads_should_exit;
|
||||
volatile int next_write_slot; // Next slot to write to output
|
||||
volatile int jobs_submitted;
|
||||
volatile int jobs_completed;
|
||||
|
||||
// Timing
|
||||
time_t start_time;
|
||||
} dt_decoder_t;
|
||||
|
||||
// =============================================================================
|
||||
@@ -151,6 +216,8 @@ static void print_usage(const char *program) {
|
||||
printf(" -i, --input FILE Input TAV-DT file\n");
|
||||
printf(" -o, --output FILE Output video file (FFV1/MKV)\n");
|
||||
printf("\nOptions:\n");
|
||||
printf(" -t, --threads N Number of decoder threads (default: min(8, available CPUs))\n");
|
||||
printf(" 0 or 1 = single-threaded, 2-16 = multithreaded\n");
|
||||
printf(" --dump Dump packet info without decoding\n");
|
||||
printf(" -v, --verbose Verbose output\n");
|
||||
printf(" --help Show this help\n");
|
||||
@@ -354,6 +421,12 @@ static int decode_audio_subpacket(dt_decoder_t *dec, const uint8_t *data, size_t
|
||||
// Calculate RS payload size
|
||||
size_t rs_total = rs_block_count * RS_BLOCK_SIZE;
|
||||
|
||||
// Handle empty audio packet (no samples in this GOP)
|
||||
if (compressed_size == 0 || rs_block_count == 0 || sample_count == 0) {
|
||||
*consumed = offset;
|
||||
return 0; // Successfully processed empty audio packet
|
||||
}
|
||||
|
||||
if (offset + rs_total > data_len) {
|
||||
if (dec->verbose) {
|
||||
fprintf(stderr, "Warning: Audio packet truncated\n");
|
||||
@@ -386,8 +459,13 @@ static int decode_audio_subpacket(dt_decoder_t *dec, const uint8_t *data, size_t
|
||||
// [sample_count(2)][max_index(1)][payload_size(4)][zstd_data]
|
||||
// No need to rebuild the header - pass it directly to the TAD decoder
|
||||
|
||||
// Decode TAD to PCMu8
|
||||
uint8_t *pcmu8_output = malloc(sample_count * 2);
|
||||
// Read the actual sample count from the TAD chunk header (not the wrapper header)
|
||||
// The wrapper header sample_count might be incorrect or 0 in some cases
|
||||
uint16_t tad_chunk_sample_count;
|
||||
memcpy(&tad_chunk_sample_count, decoded_payload, 2);
|
||||
|
||||
// Decode TAD to PCMu8 - allocate based on TAD chunk's sample count
|
||||
uint8_t *pcmu8_output = malloc(tad_chunk_sample_count * 2);
|
||||
if (!pcmu8_output) {
|
||||
free(rs_data);
|
||||
free(decoded_payload);
|
||||
@@ -717,6 +795,231 @@ static int spawn_ffmpeg(dt_decoder_t *dec) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Multithreading Support
|
||||
// =============================================================================
|
||||
|
||||
// Worker thread function - decodes GOPs in parallel
|
||||
static void *decoder_worker_thread(void *arg) {
|
||||
dt_decoder_t *dec = (dt_decoder_t *)arg;
|
||||
|
||||
// Get thread index by finding our thread ID in the array
|
||||
int thread_idx = -1;
|
||||
pthread_t self = pthread_self();
|
||||
for (int i = 0; i < dec->num_threads; i++) {
|
||||
if (pthread_equal(dec->worker_threads[i], self)) {
|
||||
thread_idx = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (thread_idx < 0) thread_idx = 0; // Fallback
|
||||
|
||||
tav_video_context_t *my_video_ctx = dec->worker_video_ctx[thread_idx];
|
||||
|
||||
while (1) {
|
||||
pthread_mutex_lock(&dec->mutex);
|
||||
|
||||
// Find a pending slot to work on
|
||||
int slot_idx = -1;
|
||||
while (slot_idx < 0 && !dec->threads_should_exit) {
|
||||
for (int i = 0; i < dec->num_slots; i++) {
|
||||
if (dec->slots[i].status == DECODE_SLOT_PENDING &&
|
||||
dec->slots[i].compressed_data != NULL) {
|
||||
dec->slots[i].status = DECODE_SLOT_DONE; // Claim it temporarily
|
||||
slot_idx = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (slot_idx < 0 && !dec->threads_should_exit) {
|
||||
pthread_cond_wait(&dec->cond_job_available, &dec->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
if (dec->threads_should_exit && slot_idx < 0) {
|
||||
pthread_mutex_unlock(&dec->mutex);
|
||||
break;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&dec->mutex);
|
||||
|
||||
if (slot_idx < 0) continue;
|
||||
|
||||
gop_decode_job_t *job = &dec->slots[slot_idx];
|
||||
|
||||
// Decode GOP using our thread's decoder context
|
||||
job->decode_result = tav_video_decode_gop(
|
||||
my_video_ctx,
|
||||
job->compressed_data,
|
||||
job->compressed_size,
|
||||
job->gop_size,
|
||||
job->rgb_frames
|
||||
);
|
||||
|
||||
// Free compressed data
|
||||
free(job->compressed_data);
|
||||
job->compressed_data = NULL;
|
||||
|
||||
// Mark as done
|
||||
pthread_mutex_lock(&dec->mutex);
|
||||
job->status = DECODE_SLOT_DONE;
|
||||
dec->jobs_completed++;
|
||||
pthread_cond_broadcast(&dec->cond_slot_free);
|
||||
pthread_mutex_unlock(&dec->mutex);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int init_decoder_threads(dt_decoder_t *dec) {
|
||||
if (dec->num_threads <= 0) {
|
||||
return 0; // Single-threaded mode
|
||||
}
|
||||
|
||||
// Limit threads
|
||||
if (dec->num_threads > MAX_DECODE_THREADS) {
|
||||
dec->num_threads = MAX_DECODE_THREADS;
|
||||
}
|
||||
|
||||
// Number of slots = threads + 2 for pipelining
|
||||
dec->num_slots = dec->num_threads + 2;
|
||||
|
||||
// Allocate slots
|
||||
dec->slots = calloc(dec->num_slots, sizeof(gop_decode_job_t));
|
||||
if (!dec->slots) {
|
||||
fprintf(stderr, "Error: Failed to allocate decode slots\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Allocate frame buffers for each slot
|
||||
int internal_height = dec->is_interlaced ? dec->height / 2 : dec->height;
|
||||
size_t frame_size = dec->width * internal_height * 3;
|
||||
int max_gop_size = 16; // TAV-DT uses fixed 16-frame GOPs
|
||||
|
||||
for (int i = 0; i < dec->num_slots; i++) {
|
||||
dec->slots[i].rgb_frames = malloc(max_gop_size * sizeof(uint8_t*));
|
||||
if (!dec->slots[i].rgb_frames) {
|
||||
fprintf(stderr, "Error: Failed to allocate frame pointers for slot %d\n", i);
|
||||
return -1;
|
||||
}
|
||||
for (int f = 0; f < max_gop_size; f++) {
|
||||
dec->slots[i].rgb_frames[f] = malloc(frame_size);
|
||||
if (!dec->slots[i].rgb_frames[f]) {
|
||||
fprintf(stderr, "Error: Failed to allocate frame buffer for slot %d\n", i);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
dec->slots[i].frames_allocated = max_gop_size;
|
||||
dec->slots[i].status = DECODE_SLOT_EMPTY;
|
||||
dec->slots[i].job_id = -1;
|
||||
}
|
||||
|
||||
// Create per-thread video decoder contexts
|
||||
dec->worker_video_ctx = malloc(dec->num_threads * sizeof(tav_video_context_t*));
|
||||
if (!dec->worker_video_ctx) {
|
||||
fprintf(stderr, "Error: Failed to allocate worker video contexts\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
tav_video_params_t video_params = {
|
||||
.width = dec->width,
|
||||
.height = internal_height,
|
||||
.decomp_levels = DT_SPATIAL_LEVELS,
|
||||
.temporal_levels = DT_TEMPORAL_LEVELS,
|
||||
.wavelet_filter = 1, // CDF 9/7
|
||||
.temporal_wavelet = 255, // Haar
|
||||
.entropy_coder = 1, // EZBC
|
||||
.channel_layout = 0, // YCoCg-R
|
||||
.perceptual_tuning = 1,
|
||||
.quantiser_y = QUALITY_Y[dec->quality_index],
|
||||
.quantiser_co = QUALITY_CO[dec->quality_index],
|
||||
.quantiser_cg = QUALITY_CG[dec->quality_index],
|
||||
.encoder_preset = 0x01, // Sports
|
||||
.monoblock = 1
|
||||
};
|
||||
|
||||
for (int i = 0; i < dec->num_threads; i++) {
|
||||
dec->worker_video_ctx[i] = tav_video_create(&video_params);
|
||||
if (!dec->worker_video_ctx[i]) {
|
||||
fprintf(stderr, "Error: Failed to create video context for thread %d\n", i);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize synchronization primitives
|
||||
pthread_mutex_init(&dec->mutex, NULL);
|
||||
pthread_cond_init(&dec->cond_job_available, NULL);
|
||||
pthread_cond_init(&dec->cond_slot_free, NULL);
|
||||
dec->threads_should_exit = 0;
|
||||
dec->next_write_slot = 0;
|
||||
dec->jobs_submitted = 0;
|
||||
dec->jobs_completed = 0;
|
||||
|
||||
// Create worker threads
|
||||
dec->worker_threads = malloc(dec->num_threads * sizeof(pthread_t));
|
||||
if (!dec->worker_threads) {
|
||||
fprintf(stderr, "Error: Failed to allocate worker threads\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < dec->num_threads; i++) {
|
||||
if (pthread_create(&dec->worker_threads[i], NULL, decoder_worker_thread, dec) != 0) {
|
||||
fprintf(stderr, "Error: Failed to create worker thread %d\n", i);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (dec->verbose) {
|
||||
printf("Initialized %d decoder worker threads with %d slots\n",
|
||||
dec->num_threads, dec->num_slots);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void cleanup_decoder_threads(dt_decoder_t *dec) {
|
||||
if (dec->num_threads <= 0) return;
|
||||
|
||||
// Signal threads to exit
|
||||
pthread_mutex_lock(&dec->mutex);
|
||||
dec->threads_should_exit = 1;
|
||||
pthread_cond_broadcast(&dec->cond_job_available);
|
||||
pthread_mutex_unlock(&dec->mutex);
|
||||
|
||||
// Wait for threads to finish
|
||||
for (int i = 0; i < dec->num_threads; i++) {
|
||||
pthread_join(dec->worker_threads[i], NULL);
|
||||
}
|
||||
free(dec->worker_threads);
|
||||
dec->worker_threads = NULL;
|
||||
|
||||
// Free per-thread video contexts
|
||||
for (int i = 0; i < dec->num_threads; i++) {
|
||||
tav_video_free(dec->worker_video_ctx[i]);
|
||||
}
|
||||
free(dec->worker_video_ctx);
|
||||
dec->worker_video_ctx = NULL;
|
||||
|
||||
// Free slots
|
||||
for (int i = 0; i < dec->num_slots; i++) {
|
||||
if (dec->slots[i].rgb_frames) {
|
||||
for (int f = 0; f < dec->slots[i].frames_allocated; f++) {
|
||||
free(dec->slots[i].rgb_frames[f]);
|
||||
}
|
||||
free(dec->slots[i].rgb_frames);
|
||||
}
|
||||
if (dec->slots[i].compressed_data) {
|
||||
free(dec->slots[i].compressed_data);
|
||||
}
|
||||
}
|
||||
free(dec->slots);
|
||||
dec->slots = NULL;
|
||||
|
||||
// Destroy sync primitives
|
||||
pthread_mutex_destroy(&dec->mutex);
|
||||
pthread_cond_destroy(&dec->cond_job_available);
|
||||
pthread_cond_destroy(&dec->cond_slot_free);
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Main Decoding Loop
|
||||
// =============================================================================
|
||||
@@ -860,6 +1163,9 @@ int main(int argc, char **argv) {
|
||||
dt_decoder_t dec;
|
||||
memset(&dec, 0, sizeof(dec));
|
||||
|
||||
// Default thread count
|
||||
dec.num_threads = get_default_thread_count();
|
||||
|
||||
// Initialize FEC libraries
|
||||
rs_init();
|
||||
ldpc_init();
|
||||
@@ -867,6 +1173,7 @@ int main(int argc, char **argv) {
|
||||
static struct option long_options[] = {
|
||||
{"input", required_argument, 0, 'i'},
|
||||
{"output", required_argument, 0, 'o'},
|
||||
{"threads", required_argument, 0, 't'},
|
||||
{"dump", no_argument, 0, 'd'},
|
||||
{"verbose", no_argument, 0, 'v'},
|
||||
{"help", no_argument, 0, 'h'},
|
||||
@@ -874,7 +1181,7 @@ int main(int argc, char **argv) {
|
||||
};
|
||||
|
||||
int opt;
|
||||
while ((opt = getopt_long(argc, argv, "i:o:dvh", long_options, NULL)) != -1) {
|
||||
while ((opt = getopt_long(argc, argv, "i:o:t:dvh", long_options, NULL)) != -1) {
|
||||
switch (opt) {
|
||||
case 'i':
|
||||
dec.input_file = optarg;
|
||||
@@ -882,6 +1189,17 @@ int main(int argc, char **argv) {
|
||||
case 'o':
|
||||
dec.output_file = optarg;
|
||||
break;
|
||||
case 't': {
|
||||
int threads = atoi(optarg);
|
||||
if (threads < 0) {
|
||||
fprintf(stderr, "Error: Thread count must be positive\n");
|
||||
return 1;
|
||||
}
|
||||
// Both 0 and 1 mean single-threaded (use value 0 internally)
|
||||
dec.num_threads = (threads <= 1) ? 0 : threads;
|
||||
if (dec.num_threads > MAX_DECODE_THREADS) dec.num_threads = MAX_DECODE_THREADS;
|
||||
break;
|
||||
}
|
||||
case 'd':
|
||||
dec.dump_mode = 1;
|
||||
break;
|
||||
|
||||
Reference in New Issue
Block a user