mirror of
https://github.com/curioustorvald/tsvm.git
synced 2026-03-07 11:51:49 +09:00
TAV-DT multithreaded decoding
This commit is contained in:
@@ -71,25 +71,26 @@ static const int QUALITY_CG[] = {148, 133, 113, 99, 76, 39};
|
|||||||
// Multithreading Structures
|
// Multithreading Structures
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
|
|
||||||
#define DECODE_SLOT_EMPTY 0
|
#define DECODE_SLOT_EMPTY 0
|
||||||
#define DECODE_SLOT_PENDING 1
|
#define DECODE_SLOT_PENDING 1
|
||||||
#define DECODE_SLOT_DONE 2
|
#define DECODE_SLOT_PROCESSING 2
|
||||||
|
#define DECODE_SLOT_DONE 3
|
||||||
|
|
||||||
// GOP decode job structure
|
// GOP decode job structure
|
||||||
typedef struct {
|
typedef struct {
|
||||||
// Input
|
// Input
|
||||||
uint8_t *compressed_data; // Raw GOP data to decode
|
uint8_t *compressed_data; // Raw GOP data to decode (owned by job)
|
||||||
size_t compressed_size;
|
size_t compressed_size;
|
||||||
int gop_size; // Number of frames in this GOP
|
int gop_size; // Number of frames in this GOP
|
||||||
int job_id; // Sequential job ID for ordering output
|
int job_id; // Sequential job ID for ordering output
|
||||||
|
|
||||||
// Output
|
// Output
|
||||||
uint8_t **rgb_frames; // Decoded RGB24 frames [gop_size]
|
uint8_t **rgb_frames; // Decoded RGB24 frames [gop_size]
|
||||||
int frames_allocated; // How many frames are allocated
|
size_t frame_size; // Size of each frame in bytes
|
||||||
int decode_result; // 0 = success, -1 = error
|
int decode_result; // 0 = success, -1 = error
|
||||||
|
|
||||||
// Status
|
// Status
|
||||||
volatile int status;
|
volatile int status; // DECODE_SLOT_EMPTY, PENDING, or DONE
|
||||||
} gop_decode_job_t;
|
} gop_decode_job_t;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -375,6 +376,217 @@ static int read_and_decode_header(dt_decoder_t *dec, dt_packet_header_t *header)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// =============================================================================
|
||||||
|
// Multithreading Support
|
||||||
|
// =============================================================================
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Worker thread function for parallel GOP decoding
|
||||||
|
*/
|
||||||
|
static void *decoder_worker_thread(void *arg) {
|
||||||
|
dt_decoder_t *dec = (dt_decoder_t *)arg;
|
||||||
|
int thread_id = -1;
|
||||||
|
|
||||||
|
// Find our thread ID
|
||||||
|
for (int i = 0; i < dec->num_threads; i++) {
|
||||||
|
if (pthread_equal(dec->worker_threads[i], pthread_self())) {
|
||||||
|
thread_id = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (thread_id < 0) {
|
||||||
|
fprintf(stderr, "Error: Worker thread couldn't find its ID\n");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
tav_video_context_t *video_ctx = dec->worker_video_ctx[thread_id];
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
pthread_mutex_lock(&dec->mutex);
|
||||||
|
|
||||||
|
// Look for a pending job and claim it
|
||||||
|
int job_idx = -1;
|
||||||
|
for (int i = 0; i < dec->num_slots; i++) {
|
||||||
|
if (dec->slots[i].status == DECODE_SLOT_PENDING) {
|
||||||
|
job_idx = i;
|
||||||
|
dec->slots[i].status = DECODE_SLOT_PROCESSING; // Claim it - prevents other threads from picking it
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (job_idx < 0) {
|
||||||
|
// No jobs available, check if we should exit
|
||||||
|
if (dec->threads_should_exit) {
|
||||||
|
pthread_mutex_unlock(&dec->mutex);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for a job
|
||||||
|
pthread_cond_wait(&dec->cond_job_available, &dec->mutex);
|
||||||
|
pthread_mutex_unlock(&dec->mutex);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&dec->mutex);
|
||||||
|
|
||||||
|
// Decode this GOP
|
||||||
|
gop_decode_job_t *job = &dec->slots[job_idx];
|
||||||
|
|
||||||
|
// The compressed data format: [type(1)][gop_size(1)][size(4)][zstd_data]
|
||||||
|
const uint8_t *zstd_data = job->compressed_data + 6;
|
||||||
|
size_t zstd_size = job->compressed_size > 6 ? job->compressed_size - 6 : 0;
|
||||||
|
|
||||||
|
job->decode_result = tav_video_decode_gop(video_ctx, zstd_data, zstd_size,
|
||||||
|
job->gop_size, job->rgb_frames);
|
||||||
|
|
||||||
|
// Mark as done
|
||||||
|
pthread_mutex_lock(&dec->mutex);
|
||||||
|
job->status = DECODE_SLOT_DONE;
|
||||||
|
dec->jobs_completed++;
|
||||||
|
pthread_cond_broadcast(&dec->cond_slot_free);
|
||||||
|
pthread_mutex_unlock(&dec->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize decoder threads
|
||||||
|
*/
|
||||||
|
static int init_decoder_threads(dt_decoder_t *dec) {
|
||||||
|
if (dec->num_threads <= 1) {
|
||||||
|
return 0; // Single-threaded, nothing to initialize
|
||||||
|
}
|
||||||
|
|
||||||
|
dec->num_slots = dec->num_threads + 2; // Pipeline with lookahead
|
||||||
|
dec->slots = calloc(dec->num_slots, sizeof(gop_decode_job_t));
|
||||||
|
if (!dec->slots) {
|
||||||
|
fprintf(stderr, "Error: Cannot allocate decode slots\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize slots
|
||||||
|
for (int i = 0; i < dec->num_slots; i++) {
|
||||||
|
dec->slots[i].status = DECODE_SLOT_EMPTY;
|
||||||
|
dec->slots[i].job_id = -1;
|
||||||
|
dec->slots[i].rgb_frames = NULL;
|
||||||
|
dec->slots[i].compressed_data = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create per-thread video decoder contexts
|
||||||
|
dec->worker_video_ctx = calloc(dec->num_threads, sizeof(tav_video_context_t*));
|
||||||
|
if (!dec->worker_video_ctx) {
|
||||||
|
free(dec->slots);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tav_video_params_t vparams;
|
||||||
|
vparams.width = dec->width;
|
||||||
|
vparams.height = dec->is_interlaced ? dec->height / 2 : dec->height;
|
||||||
|
vparams.decomp_levels = DT_SPATIAL_LEVELS;
|
||||||
|
vparams.temporal_levels = DT_TEMPORAL_LEVELS;
|
||||||
|
vparams.wavelet_filter = 1; // CDF 9/7
|
||||||
|
vparams.temporal_wavelet = 255; // Haar
|
||||||
|
vparams.entropy_coder = 1; // EZBC
|
||||||
|
vparams.channel_layout = 0; // YCoCg-R
|
||||||
|
vparams.perceptual_tuning = 1;
|
||||||
|
vparams.quantiser_y = QUALITY_Y[dec->quality_index];
|
||||||
|
vparams.quantiser_co = QUALITY_CO[dec->quality_index];
|
||||||
|
vparams.quantiser_cg = QUALITY_CG[dec->quality_index];
|
||||||
|
vparams.encoder_preset = 0x01; // Sports
|
||||||
|
vparams.monoblock = 1;
|
||||||
|
|
||||||
|
for (int i = 0; i < dec->num_threads; i++) {
|
||||||
|
dec->worker_video_ctx[i] = tav_video_create(&vparams);
|
||||||
|
if (!dec->worker_video_ctx[i]) {
|
||||||
|
fprintf(stderr, "Error: Cannot create video decoder for thread %d\n", i);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize threading primitives
|
||||||
|
pthread_mutex_init(&dec->mutex, NULL);
|
||||||
|
pthread_cond_init(&dec->cond_job_available, NULL);
|
||||||
|
pthread_cond_init(&dec->cond_slot_free, NULL);
|
||||||
|
dec->threads_should_exit = 0;
|
||||||
|
dec->next_write_slot = 0;
|
||||||
|
dec->jobs_submitted = 0;
|
||||||
|
dec->jobs_completed = 0;
|
||||||
|
|
||||||
|
// Create worker threads
|
||||||
|
dec->worker_threads = calloc(dec->num_threads, sizeof(pthread_t));
|
||||||
|
if (!dec->worker_threads) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < dec->num_threads; i++) {
|
||||||
|
if (pthread_create(&dec->worker_threads[i], NULL, decoder_worker_thread, dec) != 0) {
|
||||||
|
fprintf(stderr, "Error: Cannot create worker thread %d\n", i);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dec->verbose) {
|
||||||
|
printf("Initialized %d decoder threads\n", dec->num_threads);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup decoder threads
|
||||||
|
*/
|
||||||
|
static void cleanup_decoder_threads(dt_decoder_t *dec) {
|
||||||
|
if (dec->num_threads <= 1) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Signal threads to exit
|
||||||
|
pthread_mutex_lock(&dec->mutex);
|
||||||
|
dec->threads_should_exit = 1;
|
||||||
|
pthread_cond_broadcast(&dec->cond_job_available);
|
||||||
|
pthread_mutex_unlock(&dec->mutex);
|
||||||
|
|
||||||
|
// Wait for threads
|
||||||
|
if (dec->worker_threads) {
|
||||||
|
for (int i = 0; i < dec->num_threads; i++) {
|
||||||
|
pthread_join(dec->worker_threads[i], NULL);
|
||||||
|
}
|
||||||
|
free(dec->worker_threads);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Free video contexts
|
||||||
|
if (dec->worker_video_ctx) {
|
||||||
|
for (int i = 0; i < dec->num_threads; i++) {
|
||||||
|
if (dec->worker_video_ctx[i]) {
|
||||||
|
tav_video_free(dec->worker_video_ctx[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
free(dec->worker_video_ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Free slots
|
||||||
|
if (dec->slots) {
|
||||||
|
for (int i = 0; i < dec->num_slots; i++) {
|
||||||
|
if (dec->slots[i].rgb_frames) {
|
||||||
|
for (int f = 0; f < dec->slots[i].gop_size; f++) {
|
||||||
|
free(dec->slots[i].rgb_frames[f]);
|
||||||
|
}
|
||||||
|
free(dec->slots[i].rgb_frames);
|
||||||
|
}
|
||||||
|
if (dec->slots[i].compressed_data) {
|
||||||
|
free(dec->slots[i].compressed_data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
free(dec->slots);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_destroy(&dec->mutex);
|
||||||
|
pthread_cond_destroy(&dec->cond_job_available);
|
||||||
|
pthread_cond_destroy(&dec->cond_slot_free);
|
||||||
|
}
|
||||||
|
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
// Subpacket Decoding
|
// Subpacket Decoding
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
@@ -490,6 +702,161 @@ static int decode_audio_subpacket(dt_decoder_t *dec, const uint8_t *data, size_t
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Multithreaded video decoding - submit GOP to worker pool
|
||||||
|
*/
|
||||||
|
static int decode_video_subpacket_mt(dt_decoder_t *dec, const uint8_t *data, size_t data_len,
|
||||||
|
size_t *consumed) {
|
||||||
|
// Minimum: 16 byte LDPC header
|
||||||
|
if (data_len < DT_TAV_HEADER_SIZE * 2) return -1;
|
||||||
|
|
||||||
|
size_t offset = 0;
|
||||||
|
|
||||||
|
// LDPC decode TAV header (16 bytes -> 8 bytes)
|
||||||
|
uint8_t decoded_tav_header[DT_TAV_HEADER_SIZE];
|
||||||
|
int ldpc_result = ldpc_decode(data + offset, DT_TAV_HEADER_SIZE * 2, decoded_tav_header);
|
||||||
|
if (ldpc_result < 0) {
|
||||||
|
if (dec->verbose) {
|
||||||
|
fprintf(stderr, "Warning: LDPC decode failed for TAV header\n");
|
||||||
|
}
|
||||||
|
memcpy(decoded_tav_header, data + offset, DT_TAV_HEADER_SIZE);
|
||||||
|
} else if (ldpc_result > 0) {
|
||||||
|
dec->fec_corrections++;
|
||||||
|
}
|
||||||
|
offset += DT_TAV_HEADER_SIZE * 2;
|
||||||
|
|
||||||
|
// Parse TAV header
|
||||||
|
uint8_t gop_size = decoded_tav_header[0];
|
||||||
|
uint32_t compressed_size;
|
||||||
|
uint32_t rs_block_count;
|
||||||
|
|
||||||
|
memcpy(&compressed_size, decoded_tav_header + 1, 4);
|
||||||
|
rs_block_count = decoded_tav_header[5] |
|
||||||
|
((uint32_t)decoded_tav_header[6] << 8) |
|
||||||
|
((uint32_t)decoded_tav_header[7] << 16);
|
||||||
|
|
||||||
|
// Calculate RS payload size
|
||||||
|
size_t rs_total = rs_block_count * RS_BLOCK_SIZE;
|
||||||
|
|
||||||
|
if (offset + rs_total > data_len) {
|
||||||
|
*consumed = data_len;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// RS decode payload
|
||||||
|
uint8_t *rs_data = malloc(rs_total);
|
||||||
|
if (!rs_data) return -1;
|
||||||
|
memcpy(rs_data, data + offset, rs_total);
|
||||||
|
|
||||||
|
uint8_t *decoded_payload = malloc(compressed_size);
|
||||||
|
if (!decoded_payload) {
|
||||||
|
free(rs_data);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int rs_result = rs_decode_blocks(rs_data, rs_total, decoded_payload, compressed_size);
|
||||||
|
if (rs_result > 0) {
|
||||||
|
dec->fec_corrections += rs_result;
|
||||||
|
}
|
||||||
|
free(rs_data);
|
||||||
|
|
||||||
|
// Lazy initialization of multithreading (after first packet header is known)
|
||||||
|
if (!dec->worker_threads && dec->num_threads > 1) {
|
||||||
|
if (init_decoder_threads(dec) != 0) {
|
||||||
|
fprintf(stderr, "Error: Cannot initialize decoder threads, falling back to single-threaded\n");
|
||||||
|
dec->num_threads = 1;
|
||||||
|
// Fall back to single-threaded decoding for this packet
|
||||||
|
free(decoded_payload);
|
||||||
|
*consumed = offset + rs_total;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (dec->verbose) {
|
||||||
|
printf("Initialized multithreaded decoding: %d threads\n", dec->num_threads);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find an empty slot
|
||||||
|
int slot_idx = -1;
|
||||||
|
pthread_mutex_lock(&dec->mutex);
|
||||||
|
|
||||||
|
while (slot_idx < 0) {
|
||||||
|
// Try to write completed GOPs first
|
||||||
|
for (int i = 0; i < dec->num_slots; i++) {
|
||||||
|
if (dec->slots[i].status == DECODE_SLOT_DONE &&
|
||||||
|
dec->slots[i].job_id == dec->next_write_slot) {
|
||||||
|
|
||||||
|
gop_decode_job_t *job = &dec->slots[i];
|
||||||
|
pthread_mutex_unlock(&dec->mutex);
|
||||||
|
|
||||||
|
// Write frames to temp file
|
||||||
|
if (job->decode_result == 0 && dec->video_temp_fp) {
|
||||||
|
for (int f = 0; f < job->gop_size; f++) {
|
||||||
|
fwrite(job->rgb_frames[f], 1, job->frame_size, dec->video_temp_fp);
|
||||||
|
dec->frames_decoded++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_lock(&dec->mutex);
|
||||||
|
|
||||||
|
// Free job resources while holding mutex
|
||||||
|
for (int f = 0; f < job->gop_size; f++) {
|
||||||
|
free(job->rgb_frames[f]);
|
||||||
|
}
|
||||||
|
free(job->rgb_frames);
|
||||||
|
free(job->compressed_data);
|
||||||
|
|
||||||
|
job->status = DECODE_SLOT_EMPTY;
|
||||||
|
job->rgb_frames = NULL;
|
||||||
|
job->compressed_data = NULL;
|
||||||
|
dec->next_write_slot++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look for empty slot
|
||||||
|
for (int i = 0; i < dec->num_slots; i++) {
|
||||||
|
if (dec->slots[i].status == DECODE_SLOT_EMPTY) {
|
||||||
|
slot_idx = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (slot_idx < 0) {
|
||||||
|
// Wait for a slot to become available
|
||||||
|
pthread_cond_wait(&dec->cond_slot_free, &dec->mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fill the slot
|
||||||
|
gop_decode_job_t *job = &dec->slots[slot_idx];
|
||||||
|
|
||||||
|
int internal_height = dec->is_interlaced ? dec->height / 2 : dec->height;
|
||||||
|
size_t frame_size = dec->width * internal_height * 3;
|
||||||
|
|
||||||
|
job->compressed_data = decoded_payload; // Transfer ownership
|
||||||
|
job->compressed_size = compressed_size;
|
||||||
|
job->gop_size = gop_size;
|
||||||
|
job->job_id = dec->jobs_submitted++;
|
||||||
|
job->frame_size = frame_size;
|
||||||
|
job->decode_result = -1;
|
||||||
|
|
||||||
|
// Allocate frame buffers
|
||||||
|
job->rgb_frames = malloc(gop_size * sizeof(uint8_t*));
|
||||||
|
for (int i = 0; i < gop_size; i++) {
|
||||||
|
job->rgb_frames[i] = malloc(frame_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Submit job
|
||||||
|
job->status = DECODE_SLOT_PENDING;
|
||||||
|
pthread_cond_broadcast(&dec->cond_job_available);
|
||||||
|
pthread_mutex_unlock(&dec->mutex);
|
||||||
|
|
||||||
|
offset += rs_total;
|
||||||
|
*consumed = offset;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int decode_video_subpacket(dt_decoder_t *dec, const uint8_t *data, size_t data_len,
|
static int decode_video_subpacket(dt_decoder_t *dec, const uint8_t *data, size_t data_len,
|
||||||
size_t *consumed) {
|
size_t *consumed) {
|
||||||
// Minimum: 16 byte LDPC header
|
// Minimum: 16 byte LDPC header
|
||||||
@@ -797,229 +1164,6 @@ static int spawn_ffmpeg(dt_decoder_t *dec) {
|
|||||||
|
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
// Multithreading Support
|
// Multithreading Support
|
||||||
// =============================================================================
|
|
||||||
|
|
||||||
// Worker thread function - decodes GOPs in parallel
|
|
||||||
static void *decoder_worker_thread(void *arg) {
|
|
||||||
dt_decoder_t *dec = (dt_decoder_t *)arg;
|
|
||||||
|
|
||||||
// Get thread index by finding our thread ID in the array
|
|
||||||
int thread_idx = -1;
|
|
||||||
pthread_t self = pthread_self();
|
|
||||||
for (int i = 0; i < dec->num_threads; i++) {
|
|
||||||
if (pthread_equal(dec->worker_threads[i], self)) {
|
|
||||||
thread_idx = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (thread_idx < 0) thread_idx = 0; // Fallback
|
|
||||||
|
|
||||||
tav_video_context_t *my_video_ctx = dec->worker_video_ctx[thread_idx];
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
pthread_mutex_lock(&dec->mutex);
|
|
||||||
|
|
||||||
// Find a pending slot to work on
|
|
||||||
int slot_idx = -1;
|
|
||||||
while (slot_idx < 0 && !dec->threads_should_exit) {
|
|
||||||
for (int i = 0; i < dec->num_slots; i++) {
|
|
||||||
if (dec->slots[i].status == DECODE_SLOT_PENDING &&
|
|
||||||
dec->slots[i].compressed_data != NULL) {
|
|
||||||
dec->slots[i].status = DECODE_SLOT_DONE; // Claim it temporarily
|
|
||||||
slot_idx = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (slot_idx < 0 && !dec->threads_should_exit) {
|
|
||||||
pthread_cond_wait(&dec->cond_job_available, &dec->mutex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dec->threads_should_exit && slot_idx < 0) {
|
|
||||||
pthread_mutex_unlock(&dec->mutex);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&dec->mutex);
|
|
||||||
|
|
||||||
if (slot_idx < 0) continue;
|
|
||||||
|
|
||||||
gop_decode_job_t *job = &dec->slots[slot_idx];
|
|
||||||
|
|
||||||
// Decode GOP using our thread's decoder context
|
|
||||||
job->decode_result = tav_video_decode_gop(
|
|
||||||
my_video_ctx,
|
|
||||||
job->compressed_data,
|
|
||||||
job->compressed_size,
|
|
||||||
job->gop_size,
|
|
||||||
job->rgb_frames
|
|
||||||
);
|
|
||||||
|
|
||||||
// Free compressed data
|
|
||||||
free(job->compressed_data);
|
|
||||||
job->compressed_data = NULL;
|
|
||||||
|
|
||||||
// Mark as done
|
|
||||||
pthread_mutex_lock(&dec->mutex);
|
|
||||||
job->status = DECODE_SLOT_DONE;
|
|
||||||
dec->jobs_completed++;
|
|
||||||
pthread_cond_broadcast(&dec->cond_slot_free);
|
|
||||||
pthread_mutex_unlock(&dec->mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int init_decoder_threads(dt_decoder_t *dec) {
|
|
||||||
if (dec->num_threads <= 0) {
|
|
||||||
return 0; // Single-threaded mode
|
|
||||||
}
|
|
||||||
|
|
||||||
// Limit threads
|
|
||||||
if (dec->num_threads > MAX_DECODE_THREADS) {
|
|
||||||
dec->num_threads = MAX_DECODE_THREADS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Number of slots = threads + 2 for pipelining
|
|
||||||
dec->num_slots = dec->num_threads + 2;
|
|
||||||
|
|
||||||
// Allocate slots
|
|
||||||
dec->slots = calloc(dec->num_slots, sizeof(gop_decode_job_t));
|
|
||||||
if (!dec->slots) {
|
|
||||||
fprintf(stderr, "Error: Failed to allocate decode slots\n");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allocate frame buffers for each slot
|
|
||||||
int internal_height = dec->is_interlaced ? dec->height / 2 : dec->height;
|
|
||||||
size_t frame_size = dec->width * internal_height * 3;
|
|
||||||
int max_gop_size = 16; // TAV-DT uses fixed 16-frame GOPs
|
|
||||||
|
|
||||||
for (int i = 0; i < dec->num_slots; i++) {
|
|
||||||
dec->slots[i].rgb_frames = malloc(max_gop_size * sizeof(uint8_t*));
|
|
||||||
if (!dec->slots[i].rgb_frames) {
|
|
||||||
fprintf(stderr, "Error: Failed to allocate frame pointers for slot %d\n", i);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
for (int f = 0; f < max_gop_size; f++) {
|
|
||||||
dec->slots[i].rgb_frames[f] = malloc(frame_size);
|
|
||||||
if (!dec->slots[i].rgb_frames[f]) {
|
|
||||||
fprintf(stderr, "Error: Failed to allocate frame buffer for slot %d\n", i);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
dec->slots[i].frames_allocated = max_gop_size;
|
|
||||||
dec->slots[i].status = DECODE_SLOT_EMPTY;
|
|
||||||
dec->slots[i].job_id = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create per-thread video decoder contexts
|
|
||||||
dec->worker_video_ctx = malloc(dec->num_threads * sizeof(tav_video_context_t*));
|
|
||||||
if (!dec->worker_video_ctx) {
|
|
||||||
fprintf(stderr, "Error: Failed to allocate worker video contexts\n");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tav_video_params_t video_params = {
|
|
||||||
.width = dec->width,
|
|
||||||
.height = internal_height,
|
|
||||||
.decomp_levels = DT_SPATIAL_LEVELS,
|
|
||||||
.temporal_levels = DT_TEMPORAL_LEVELS,
|
|
||||||
.wavelet_filter = 1, // CDF 9/7
|
|
||||||
.temporal_wavelet = 255, // Haar
|
|
||||||
.entropy_coder = 1, // EZBC
|
|
||||||
.channel_layout = 0, // YCoCg-R
|
|
||||||
.perceptual_tuning = 1,
|
|
||||||
.quantiser_y = QUALITY_Y[dec->quality_index],
|
|
||||||
.quantiser_co = QUALITY_CO[dec->quality_index],
|
|
||||||
.quantiser_cg = QUALITY_CG[dec->quality_index],
|
|
||||||
.encoder_preset = 0x01, // Sports
|
|
||||||
.monoblock = 1
|
|
||||||
};
|
|
||||||
|
|
||||||
for (int i = 0; i < dec->num_threads; i++) {
|
|
||||||
dec->worker_video_ctx[i] = tav_video_create(&video_params);
|
|
||||||
if (!dec->worker_video_ctx[i]) {
|
|
||||||
fprintf(stderr, "Error: Failed to create video context for thread %d\n", i);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize synchronization primitives
|
|
||||||
pthread_mutex_init(&dec->mutex, NULL);
|
|
||||||
pthread_cond_init(&dec->cond_job_available, NULL);
|
|
||||||
pthread_cond_init(&dec->cond_slot_free, NULL);
|
|
||||||
dec->threads_should_exit = 0;
|
|
||||||
dec->next_write_slot = 0;
|
|
||||||
dec->jobs_submitted = 0;
|
|
||||||
dec->jobs_completed = 0;
|
|
||||||
|
|
||||||
// Create worker threads
|
|
||||||
dec->worker_threads = malloc(dec->num_threads * sizeof(pthread_t));
|
|
||||||
if (!dec->worker_threads) {
|
|
||||||
fprintf(stderr, "Error: Failed to allocate worker threads\n");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < dec->num_threads; i++) {
|
|
||||||
if (pthread_create(&dec->worker_threads[i], NULL, decoder_worker_thread, dec) != 0) {
|
|
||||||
fprintf(stderr, "Error: Failed to create worker thread %d\n", i);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dec->verbose) {
|
|
||||||
printf("Initialized %d decoder worker threads with %d slots\n",
|
|
||||||
dec->num_threads, dec->num_slots);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void cleanup_decoder_threads(dt_decoder_t *dec) {
|
|
||||||
if (dec->num_threads <= 0) return;
|
|
||||||
|
|
||||||
// Signal threads to exit
|
|
||||||
pthread_mutex_lock(&dec->mutex);
|
|
||||||
dec->threads_should_exit = 1;
|
|
||||||
pthread_cond_broadcast(&dec->cond_job_available);
|
|
||||||
pthread_mutex_unlock(&dec->mutex);
|
|
||||||
|
|
||||||
// Wait for threads to finish
|
|
||||||
for (int i = 0; i < dec->num_threads; i++) {
|
|
||||||
pthread_join(dec->worker_threads[i], NULL);
|
|
||||||
}
|
|
||||||
free(dec->worker_threads);
|
|
||||||
dec->worker_threads = NULL;
|
|
||||||
|
|
||||||
// Free per-thread video contexts
|
|
||||||
for (int i = 0; i < dec->num_threads; i++) {
|
|
||||||
tav_video_free(dec->worker_video_ctx[i]);
|
|
||||||
}
|
|
||||||
free(dec->worker_video_ctx);
|
|
||||||
dec->worker_video_ctx = NULL;
|
|
||||||
|
|
||||||
// Free slots
|
|
||||||
for (int i = 0; i < dec->num_slots; i++) {
|
|
||||||
if (dec->slots[i].rgb_frames) {
|
|
||||||
for (int f = 0; f < dec->slots[i].frames_allocated; f++) {
|
|
||||||
free(dec->slots[i].rgb_frames[f]);
|
|
||||||
}
|
|
||||||
free(dec->slots[i].rgb_frames);
|
|
||||||
}
|
|
||||||
if (dec->slots[i].compressed_data) {
|
|
||||||
free(dec->slots[i].compressed_data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
free(dec->slots);
|
|
||||||
dec->slots = NULL;
|
|
||||||
|
|
||||||
// Destroy sync primitives
|
|
||||||
pthread_mutex_destroy(&dec->mutex);
|
|
||||||
pthread_cond_destroy(&dec->cond_job_available);
|
|
||||||
pthread_cond_destroy(&dec->cond_slot_free);
|
|
||||||
}
|
|
||||||
|
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
// Main Decoding Loop
|
// Main Decoding Loop
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
@@ -1067,8 +1211,13 @@ static int process_packet(dt_decoder_t *dec) {
|
|||||||
// Process TAV subpacket (video comes after audio)
|
// Process TAV subpacket (video comes after audio)
|
||||||
if (header.offset_to_video < header.packet_size) {
|
if (header.offset_to_video < header.packet_size) {
|
||||||
size_t tav_consumed = 0;
|
size_t tav_consumed = 0;
|
||||||
decode_video_subpacket(dec, packet_data + header.offset_to_video,
|
if (dec->num_threads > 1) {
|
||||||
header.packet_size - header.offset_to_video, &tav_consumed);
|
decode_video_subpacket_mt(dec, packet_data + header.offset_to_video,
|
||||||
|
header.packet_size - header.offset_to_video, &tav_consumed);
|
||||||
|
} else {
|
||||||
|
decode_video_subpacket(dec, packet_data + header.offset_to_video,
|
||||||
|
header.packet_size - header.offset_to_video, &tav_consumed);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dec->packets_processed++;
|
dec->packets_processed++;
|
||||||
@@ -1104,6 +1253,9 @@ static int run_decoder(dt_decoder_t *dec) {
|
|||||||
fprintf(stderr, "Warning: Cannot create temp video file, video will be skipped\n");
|
fprintf(stderr, "Warning: Cannot create temp video file, video will be skipped\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Note: Multithreading will be initialized lazily after reading first packet header
|
||||||
|
// (need to know dimensions and quality settings first)
|
||||||
|
|
||||||
// Decode all packets
|
// Decode all packets
|
||||||
if (dec->verbose) {
|
if (dec->verbose) {
|
||||||
printf("Decoding TAV-DT stream...\n");
|
printf("Decoding TAV-DT stream...\n");
|
||||||
@@ -1114,6 +1266,58 @@ static int run_decoder(dt_decoder_t *dec) {
|
|||||||
// Progress is shown in process_packet
|
// Progress is shown in process_packet
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Flush remaining GOPs in multithreaded mode
|
||||||
|
if (dec->num_threads > 1) {
|
||||||
|
pthread_mutex_lock(&dec->mutex);
|
||||||
|
|
||||||
|
// Write all remaining completed GOPs in order
|
||||||
|
while (dec->next_write_slot < dec->jobs_submitted) {
|
||||||
|
int found = -1;
|
||||||
|
for (int i = 0; i < dec->num_slots; i++) {
|
||||||
|
if (dec->slots[i].status == DECODE_SLOT_DONE &&
|
||||||
|
dec->slots[i].job_id == dec->next_write_slot) {
|
||||||
|
found = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (found >= 0) {
|
||||||
|
gop_decode_job_t *job = &dec->slots[found];
|
||||||
|
pthread_mutex_unlock(&dec->mutex);
|
||||||
|
|
||||||
|
// Write frames
|
||||||
|
if (job->decode_result == 0 && dec->video_temp_fp) {
|
||||||
|
for (int f = 0; f < job->gop_size; f++) {
|
||||||
|
fwrite(job->rgb_frames[f], 1, job->frame_size, dec->video_temp_fp);
|
||||||
|
dec->frames_decoded++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_lock(&dec->mutex);
|
||||||
|
|
||||||
|
// Free resources while holding mutex
|
||||||
|
for (int f = 0; f < job->gop_size; f++) {
|
||||||
|
free(job->rgb_frames[f]);
|
||||||
|
}
|
||||||
|
free(job->rgb_frames);
|
||||||
|
free(job->compressed_data);
|
||||||
|
|
||||||
|
job->status = DECODE_SLOT_EMPTY;
|
||||||
|
job->rgb_frames = NULL;
|
||||||
|
job->compressed_data = NULL;
|
||||||
|
dec->next_write_slot++;
|
||||||
|
} else {
|
||||||
|
// Wait for the GOP to complete
|
||||||
|
pthread_cond_wait(&dec->cond_slot_free, &dec->mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&dec->mutex);
|
||||||
|
|
||||||
|
// Cleanup threads
|
||||||
|
cleanup_decoder_threads(dec);
|
||||||
|
}
|
||||||
|
|
||||||
// Close temp files for reading by FFmpeg
|
// Close temp files for reading by FFmpeg
|
||||||
if (dec->audio_temp_fp) {
|
if (dec->audio_temp_fp) {
|
||||||
fclose(dec->audio_temp_fp);
|
fclose(dec->audio_temp_fp);
|
||||||
|
|||||||
Reference in New Issue
Block a user