mirror of
https://github.com/curioustorvald/tsvm.git
synced 2026-06-06 13:38:30 +09:00
1835 lines
76 KiB
Python
1835 lines
76 KiB
Python
#!/usr/bin/env python3
|
||
"""it2taud.py — Convert ImpulseTracker (.it) to TSVM Taud (.taud)
|
||
|
||
Usage:
|
||
python3 it2taud.py input.it output.taud [-v] [--no-decompress]
|
||
[--no-pf-envelope]
|
||
|
||
Limits:
|
||
- Up to 20 IT channels (excess dropped; hard error if chunk count
|
||
× channel count > 4095).
|
||
- IT patterns with >64 rows are split into ⌈rows/64⌉ consecutive
|
||
Taud patterns. Pattern-loop (SBx) crossing a chunk boundary is
|
||
warned; B/C effects are remapped to new cue indices.
|
||
- IT2.14/IT2.15 compressed samples are decoded unless --no-decompress.
|
||
- IT instrument volume/pan envelopes (up to 12 nodes, sustain loops) are
|
||
converted to Taud format. NNA actions are ignored. Each IT instrument
|
||
resolves to its C-5 canonical sample.
|
||
- IT pitch/filter envelope (no Taud equivalent) is BAKED onto a per-
|
||
instrument copy of the canonical sample (--no-pf-envelope to disable).
|
||
Pitch mode uses time-varying linear-interpolated resampling; filter
|
||
mode uses a 2-pole resonant low-pass biquad (RBJ cookbook),
|
||
approximate to IT's actual filter. Looped samples are rendered as
|
||
`entry + N×loop_len` with the loop reapplied to the tail. Caveat:
|
||
the envelope is locked to the sample's playback rate, so playing
|
||
the instrument an octave up advances the envelope twice as fast.
|
||
- AdLib / OPL instruments are skipped.
|
||
|
||
Effect support:
|
||
A-Z dispatch per TAUD_NOTE_EFFECTS.md. IT-specific: Cxx is binary
|
||
(not BCD like ST3). V scales by ×2 (IT 0-128 → Taud 0-255). X is
|
||
the full 8-bit IT pan. Y panbrello nibble-repeats. Z (MIDI macro)
|
||
dropped. S6x tick-delay dropped. SAx high-offset dropped. S7x NNA
|
||
toggles dropped. Vol-column pitch-slide / tone-porta / vibrato sub-
|
||
commands forwarded to main effect slot when empty; dropped otherwise.
|
||
Per-effect private memory cohorts resolved eagerly (D/K/L share;
|
||
E/F optionally linked with G per flag bit 5).
|
||
"""
|
||
|
||
import argparse
|
||
import gzip
|
||
import math
|
||
import struct
|
||
import sys
|
||
|
||
from taud_common import (
|
||
set_verbose, vprint,
|
||
TAUD_MAGIC, TAUD_VERSION, TAUD_HEADER_SIZE, TAUD_SONG_ENTRY,
|
||
SAMPLEBIN_SIZE, INSTBIN_SIZE, SAMPLEINST_SIZE,
|
||
PATTERN_ROWS, PATTERN_BYTES, NUM_PATTERNS_MAX, NUM_CUES, CUE_SIZE, NUM_VOICES,
|
||
NOTE_NOP, NOTE_KEYOFF, NOTE_CUT, TAUD_C3,
|
||
TOP_NONE, TOP_A, TOP_B, TOP_C, TOP_D, TOP_E, TOP_F, TOP_G, TOP_H, TOP_I,
|
||
TOP_J, TOP_K, TOP_L, TOP_O, TOP_Q, TOP_R, TOP_S, TOP_T, TOP_U, TOP_V, TOP_Y,
|
||
SEL_SET, SEL_UP, SEL_DOWN, SEL_FINE,
|
||
EFF_A, EFF_B, EFF_C, EFF_D, EFF_E, EFF_F, EFF_G, EFF_H, EFF_I, EFF_J,
|
||
EFF_K, EFF_L, EFF_M, EFF_N, EFF_O, EFF_P, EFF_Q, EFF_R, EFF_S, EFF_T,
|
||
EFF_U, EFF_V, EFF_W, EFF_X, EFF_Y, EFF_Z,
|
||
J_SEMI_TABLE,
|
||
d_arg_to_col, resample_linear, encode_cue, deduplicate_patterns,
|
||
normalise_sample,
|
||
)
|
||
|
||
|
||
# ── IT constants ─────────────────────────────────────────────────────────────
|
||
|
||
IT_MAGIC = b'IMPM'
|
||
IT_SMP_MAGIC = b'IMPS'
|
||
IT_INST_MAGIC = b'IMPI'
|
||
|
||
IT_NOTE_OFF = 255
|
||
IT_NOTE_CUT = 254
|
||
IT_NOTE_FADE = 246 # treated as key-off
|
||
|
||
IT_ORD_END = 255
|
||
IT_ORD_SKIP = 254
|
||
|
||
IT_FLAG_STEREO = 0x01
|
||
IT_FLAG_USE_INST = 0x04
|
||
IT_FLAG_LINEAR = 0x08
|
||
IT_FLAG_OLD_EFFECTS = 0x10
|
||
IT_FLAG_LINK_GEF = 0x20 # link G memory with E/F
|
||
|
||
# Sample flags (Flg byte at IMPS+0x12)
|
||
IT_SMP_ASSOC = 0x01
|
||
IT_SMP_16BIT = 0x02
|
||
IT_SMP_STEREO = 0x04
|
||
IT_SMP_COMPRESSED = 0x08
|
||
IT_SMP_LOOP = 0x10
|
||
IT_SMP_SUS_LOOP = 0x20
|
||
IT_SMP_PINGPONG = 0x40
|
||
|
||
# Vol-column byte ranges (inclusive lower, inclusive upper)
|
||
VC_VOL_LO, VC_VOL_HI = 0, 64
|
||
VC_FVUP_LO, VC_FVUP_HI = 65, 74 # fine vol up A (value = vc-64, 1..10)
|
||
VC_FVDN_LO, VC_FVDN_HI = 75, 84 # fine vol down B (value = vc-74, 1..10)
|
||
VC_VUP_LO, VC_VUP_HI = 85, 94 # vol slide up C (value = vc-84, 1..10)
|
||
VC_VDN_LO, VC_VDN_HI = 95, 104 # vol slide dn D (value = vc-94, 1..10)
|
||
VC_PDN_LO, VC_PDN_HI = 105, 114 # pitch dn E (value = vc-104, 1..10)
|
||
VC_PUP_LO, VC_PUP_HI = 115, 124 # pitch up F (value = vc-114, 1..10)
|
||
VC_PAN_LO, VC_PAN_HI = 128, 192 # set pan 0..64 (value = vc-128)
|
||
VC_TPORTA_LO, VC_TPORTA_HI = 193, 202 # tone porta G
|
||
VC_VIB_LO, VC_VIB_HI = 203, 212 # vibrato H (depth 1..10)
|
||
|
||
VC_TPORTA_TABLE = (0, 1, 4, 8, 16, 32, 64, 96, 128, 255)
|
||
|
||
# IT effects that recall last non-zero arg (per-effect-private, with cohort exceptions).
|
||
# V (Set Global Volume) recalls in IT compat mode — the first V $00 resolves to the
|
||
# header's global_vol, not literal 0. Without this, songs starting with V $00 silence.
|
||
IT_MEM_EFFECTS = frozenset({
|
||
EFF_D, EFF_E, EFF_F, EFF_G, EFF_H, EFF_I, EFF_J,
|
||
EFF_K, EFF_L, EFF_N, EFF_O, EFF_P, EFF_Q, EFF_R,
|
||
EFF_S, EFF_T, EFF_U, EFF_V, EFF_W, EFF_X, EFF_Y,
|
||
})
|
||
|
||
|
||
# ── Taud constants (it-specific) ──────────────────────────────────────────────
|
||
|
||
SIGNATURE = b'it2taud/TSVM ' # 14 bytes
|
||
|
||
# ThreeFiveMiniUfloat LUT — 256 entries, seconds 0.0..126.0 (must match Kotlin)
|
||
_MINUFLOAT_LUT = [
|
||
0.0, 0.03125, 0.0625, 0.09375, 0.125, 0.15625, 0.1875, 0.21875,
|
||
0.25, 0.28125, 0.3125, 0.34375, 0.375, 0.40625, 0.4375, 0.46875,
|
||
0.5, 0.53125, 0.5625, 0.59375, 0.625, 0.65625, 0.6875, 0.71875,
|
||
0.75, 0.78125, 0.8125, 0.84375, 0.875, 0.90625, 0.9375, 0.96875,
|
||
1.0, 1.03125, 1.0625, 1.09375, 1.125, 1.15625, 1.1875, 1.21875,
|
||
1.25, 1.28125, 1.3125, 1.34375, 1.375, 1.40625, 1.4375, 1.46875,
|
||
1.5, 1.53125, 1.5625, 1.59375, 1.625, 1.65625, 1.6875, 1.71875,
|
||
1.75, 1.78125, 1.8125, 1.84375, 1.875, 1.90625, 1.9375, 1.96875,
|
||
2.0, 2.0625, 2.125, 2.1875, 2.25, 2.3125, 2.375, 2.4375,
|
||
2.5, 2.5625, 2.625, 2.6875, 2.75, 2.8125, 2.875, 2.9375,
|
||
3.0, 3.0625, 3.125, 3.1875, 3.25, 3.3125, 3.375, 3.4375,
|
||
3.5, 3.5625, 3.625, 3.6875, 3.75, 3.8125, 3.875, 3.9375,
|
||
4.0, 4.125, 4.25, 4.375, 4.5, 4.625, 4.75, 4.875,
|
||
5.0, 5.125, 5.25, 5.375, 5.5, 5.625, 5.75, 5.875,
|
||
6.0, 6.125, 6.25, 6.375, 6.5, 6.625, 6.75, 6.875,
|
||
7.0, 7.125, 7.25, 7.375, 7.5, 7.625, 7.75, 7.875,
|
||
8.0, 8.25, 8.5, 8.75, 9.0, 9.25, 9.5, 9.75,
|
||
10.0, 10.25, 10.5, 10.75, 11.0, 11.25, 11.5, 11.75,
|
||
12.0, 12.25, 12.5, 12.75, 13.0, 13.25, 13.5, 13.75,
|
||
14.0, 14.25, 14.5, 14.75, 15.0, 15.25, 15.5, 15.75,
|
||
16.0, 16.5, 17.0, 17.5, 18.0, 18.5, 19.0, 19.5,
|
||
20.0, 20.5, 21.0, 21.5, 22.0, 22.5, 23.0, 23.5,
|
||
24.0, 24.5, 25.0, 25.5, 26.0, 26.5, 27.0, 27.5,
|
||
28.0, 28.5, 29.0, 29.5, 30.0, 30.5, 31.0, 31.5,
|
||
32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0,
|
||
40.0, 41.0, 42.0, 43.0, 44.0, 45.0, 46.0, 47.0,
|
||
48.0, 49.0, 50.0, 51.0, 52.0, 53.0, 54.0, 55.0,
|
||
56.0, 57.0, 58.0, 59.0, 60.0, 61.0, 62.0, 63.0,
|
||
64.0, 66.0, 68.0, 70.0, 72.0, 74.0, 76.0, 78.0,
|
||
80.0, 82.0, 84.0, 86.0, 88.0, 90.0, 92.0, 94.0,
|
||
96.0, 98.0, 100.0, 102.0, 104.0, 106.0, 108.0, 110.0,
|
||
112.0, 114.0, 116.0, 118.0, 120.0, 122.0, 124.0, 126.0,
|
||
]
|
||
|
||
def _nearest_minifloat(sec: float) -> int:
|
||
"""Return ThreeFiveMiniUfloat index (0-255) for the nearest representable seconds value."""
|
||
if sec <= 0.0:
|
||
return 0
|
||
if sec >= 126.0:
|
||
return 255
|
||
lo, hi = 0, len(_MINUFLOAT_LUT) - 1
|
||
while lo < hi:
|
||
mid = (lo + hi) // 2
|
||
if _MINUFLOAT_LUT[mid] < sec:
|
||
lo = mid + 1
|
||
else:
|
||
hi = mid
|
||
# lo is first index where LUT[lo] >= sec; check lo-1 for nearest
|
||
if lo > 0 and abs(_MINUFLOAT_LUT[lo - 1] - sec) <= abs(_MINUFLOAT_LUT[lo] - sec):
|
||
return lo - 1
|
||
return lo
|
||
|
||
|
||
# ── IT header parser ──────────────────────────────────────────────────────────
|
||
|
||
class ITHeader:
|
||
__slots__ = ('title', 'ord_count', 'ins_count', 'smp_count', 'pat_count',
|
||
'cwt', 'cmwt', 'flags', 'special',
|
||
'global_vol', 'mix_vol', 'initial_speed', 'initial_tempo',
|
||
'pan_sep', 'linear_slides', 'use_instruments',
|
||
'link_gef', 'old_effects',
|
||
'chnl_pan', 'chnl_vol',
|
||
'order_list', 'ins_ptrs', 'smp_ptrs', 'pat_ptrs')
|
||
|
||
def parse_it_header(data: bytes) -> ITHeader:
|
||
if len(data) < 0xC0:
|
||
sys.exit("error: file too short to be IT")
|
||
if data[0:4] != IT_MAGIC:
|
||
sys.exit("error: not an IT file (bad magic)")
|
||
|
||
h = ITHeader()
|
||
h.title = data[0x04:0x1E].rstrip(b'\x00').decode('latin-1', errors='replace')
|
||
h.ord_count = struct.unpack_from('<H', data, 0x20)[0]
|
||
h.ins_count = struct.unpack_from('<H', data, 0x22)[0]
|
||
h.smp_count = struct.unpack_from('<H', data, 0x24)[0]
|
||
h.pat_count = struct.unpack_from('<H', data, 0x26)[0]
|
||
h.cwt = struct.unpack_from('<H', data, 0x28)[0]
|
||
h.cmwt = struct.unpack_from('<H', data, 0x2A)[0]
|
||
h.flags = struct.unpack_from('<H', data, 0x2C)[0]
|
||
h.special = struct.unpack_from('<H', data, 0x2E)[0]
|
||
h.global_vol = data[0x30]
|
||
h.mix_vol = data[0x31]
|
||
h.initial_speed = data[0x32]
|
||
h.initial_tempo = data[0x33]
|
||
h.pan_sep = data[0x34]
|
||
h.linear_slides = bool(h.flags & IT_FLAG_LINEAR)
|
||
h.use_instruments = bool(h.flags & IT_FLAG_USE_INST)
|
||
h.link_gef = bool(h.flags & IT_FLAG_LINK_GEF)
|
||
h.old_effects = bool(h.flags & IT_FLAG_OLD_EFFECTS)
|
||
|
||
h.chnl_pan = list(data[0x40:0x80])
|
||
h.chnl_vol = list(data[0x80:0xC0])
|
||
|
||
off = 0xC0
|
||
h.order_list = list(data[off:off + h.ord_count])
|
||
off += h.ord_count
|
||
|
||
h.ins_ptrs = [struct.unpack_from('<I', data, off + i*4)[0]
|
||
for i in range(h.ins_count)]
|
||
off += h.ins_count * 4
|
||
|
||
h.smp_ptrs = [struct.unpack_from('<I', data, off + i*4)[0]
|
||
for i in range(h.smp_count)]
|
||
off += h.smp_count * 4
|
||
|
||
h.pat_ptrs = [struct.unpack_from('<I', data, off + i*4)[0]
|
||
for i in range(h.pat_count)]
|
||
return h
|
||
|
||
|
||
# ── IT2.14 / IT2.15 sample decompressor ──────────────────────────────────────
|
||
|
||
def _wrap8(v: int) -> int:
|
||
"""Wrap to signed int8 range (C int8 overflow behaviour)."""
|
||
v &= 0xFF
|
||
return v if v < 128 else v - 256
|
||
|
||
def _wrap16(v: int) -> int:
|
||
"""Wrap to signed int16 range."""
|
||
v &= 0xFFFF
|
||
return v if v < 32768 else v - 65536
|
||
|
||
def _sign_extend(val: int, bits: int) -> int:
|
||
sign_bit = 1 << (bits - 1)
|
||
return (val & (sign_bit - 1)) - (val & sign_bit)
|
||
|
||
def _it214_decompress_block(payload: bytes, num_samples: int,
|
||
is_16bit: bool, is_it215: bool) -> list:
|
||
"""Decode one compressed block payload. Returns list of signed int output samples.
|
||
|
||
Algorithm from libxmp / schism source:
|
||
8-bit: init_width=9, short escape reads 3 bits, mid border=(1<<w)-8
|
||
16-bit: init_width=17, short escape reads 4 bits, mid border=(1<<w)-16
|
||
"""
|
||
bit_buf = 0
|
||
bit_cnt = 0
|
||
byte_pos = 0
|
||
|
||
def read_bits(n):
|
||
nonlocal bit_buf, bit_cnt, byte_pos
|
||
while bit_cnt < n:
|
||
b = payload[byte_pos] if byte_pos < len(payload) else 0
|
||
bit_buf |= b << bit_cnt
|
||
byte_pos += 1
|
||
bit_cnt += 8
|
||
val = bit_buf & ((1 << n) - 1)
|
||
bit_buf >>= n
|
||
bit_cnt -= n
|
||
return val
|
||
|
||
if is_16bit:
|
||
init_width = 17
|
||
range_count = 16 # escape range size in mid form
|
||
border_sub = 8 # = range_count / 2; centres escape range on signed midpoint
|
||
escape_bits = 4 # bits to read in short-form escape
|
||
else:
|
||
init_width = 9
|
||
range_count = 8
|
||
border_sub = 4
|
||
escape_bits = 3
|
||
|
||
width = init_width
|
||
d1 = d2 = 0
|
||
out = []
|
||
n = 0
|
||
|
||
mask = (1 << (init_width - 1)) - 1 # 0xFF (8-bit) or 0xFFFF (16-bit)
|
||
|
||
while n < num_samples:
|
||
v = read_bits(width)
|
||
|
||
is_data = False
|
||
if width < 7:
|
||
# Mode A (short): single escape code at v == 1<<(width-1).
|
||
if v == (1 << (width - 1)):
|
||
new_w = read_bits(escape_bits) + 1
|
||
width = new_w if new_w < width else new_w + 1 # skip-self
|
||
continue
|
||
# Else: data, sign-extend from `width` bits.
|
||
delta = _sign_extend(v, width)
|
||
is_data = True
|
||
elif width < init_width:
|
||
# Mode B (mid): `range_count` escape codes centred on signed midpoint.
|
||
# border = (mask >> (init_width-width)) - border_sub, where border_sub
|
||
# = range_count / 2. Reference: libxmp it_compress.c, OpenMPT ITTools.cpp.
|
||
# 8-bit: width=7 → border=63-4=59, width=8 → border=127-4=123
|
||
# 16-bit: width=7..16 with border_sub=8.
|
||
border = (mask >> (init_width - width)) - border_sub
|
||
if border < v <= border + range_count:
|
||
new_w = v - border
|
||
width = new_w if new_w < width else new_w + 1 # skip-self
|
||
continue
|
||
if v > border + range_count:
|
||
v -= range_count # collapse escape range out
|
||
delta = _sign_extend(v, width)
|
||
is_data = True
|
||
else:
|
||
# Mode C (full): top bit (bit init_width-1) signals width change.
|
||
top_bit = 1 << (init_width - 1)
|
||
if v & top_bit:
|
||
width = (v & (top_bit - 1)) + 1
|
||
continue
|
||
# Else: data is (init_width-1) bits wide, sign-extend from there.
|
||
delta = _sign_extend(v, init_width - 1)
|
||
is_data = True
|
||
|
||
if is_data:
|
||
if is_16bit:
|
||
d1 = _wrap16(d1 + delta)
|
||
if is_it215:
|
||
d2 = _wrap16(d2 + d1)
|
||
out.append(d2)
|
||
else:
|
||
out.append(d1)
|
||
else:
|
||
d1 = _wrap8(d1 + delta)
|
||
if is_it215:
|
||
d2 = _wrap8(d2 + d1)
|
||
out.append(d2)
|
||
else:
|
||
out.append(d1)
|
||
n += 1
|
||
|
||
return out
|
||
|
||
def it214_decompress(blob: bytes, smp_offset: int, num_samples: int,
|
||
is_16bit: bool, is_it215: bool) -> bytes:
|
||
"""Decode IT2.14/IT2.15 compressed sample data. Returns raw PCM bytes (signed)."""
|
||
block_size = 0x4000 if is_16bit else 0x8000
|
||
pos = smp_offset
|
||
out_samples = []
|
||
|
||
while len(out_samples) < num_samples:
|
||
if pos + 2 > len(blob):
|
||
break
|
||
block_len = struct.unpack_from('<H', blob, pos)[0]
|
||
pos += 2
|
||
payload = blob[pos:pos + block_len]
|
||
pos += block_len
|
||
|
||
remaining = num_samples - len(out_samples)
|
||
chunk_n = min(remaining, block_size)
|
||
chunk = _it214_decompress_block(payload, chunk_n, is_16bit, is_it215)
|
||
out_samples.extend(chunk)
|
||
|
||
if is_16bit:
|
||
result = bytearray(len(out_samples) * 2)
|
||
for i, s in enumerate(out_samples):
|
||
struct.pack_into('<h', result, i * 2, max(-32768, min(32767, s)))
|
||
return bytes(result)
|
||
else:
|
||
return bytes(s & 0xFF for s in out_samples)
|
||
|
||
|
||
# ── IT sample parser ──────────────────────────────────────────────────────────
|
||
|
||
class ITSample:
|
||
__slots__ = ('name', 'filename', 'gv', 'vol', 'flags', 'cvt', 'dfp',
|
||
'c5_speed', 'length', 'loop_beg', 'loop_end',
|
||
'sus_beg', 'sus_end', 'smp_point',
|
||
'has_loop', 'is_16bit', 'is_stereo', 'is_compressed',
|
||
'is_signed', 'sample_data')
|
||
|
||
def parse_samples(data: bytes, h: ITHeader, decompress: bool) -> list:
|
||
samples = []
|
||
# IT2.15 compression is signaled PER-SAMPLE via cvt bit 2 (0x04), not globally
|
||
# via the file's cwt. Reference: OpenMPT ITTools.cpp, libxmp it_load.c.
|
||
for i, ptr in enumerate(h.smp_ptrs):
|
||
if ptr == 0 or ptr + 0x50 > len(data):
|
||
vprint(f" warning: sample {i+1} pointer {ptr:#x} out of range, skipping")
|
||
samples.append(None)
|
||
continue
|
||
if data[ptr:ptr+4] != IT_SMP_MAGIC:
|
||
vprint(f" warning: sample {i+1} at {ptr:#x} has bad magic, skipping")
|
||
samples.append(None)
|
||
continue
|
||
|
||
s = ITSample()
|
||
s.filename = data[ptr+0x04:ptr+0x10].rstrip(b'\x00').decode('latin-1', errors='replace')
|
||
s.gv = data[ptr+0x11]
|
||
s.flags = data[ptr+0x12]
|
||
s.vol = data[ptr+0x13]
|
||
s.name = data[ptr+0x14:ptr+0x2E].rstrip(b'\x00').decode('latin-1', errors='replace')
|
||
s.cvt = data[ptr+0x2E]
|
||
s.dfp = data[ptr+0x2F]
|
||
s.length = struct.unpack_from('<I', data, ptr+0x30)[0]
|
||
s.loop_beg = struct.unpack_from('<I', data, ptr+0x34)[0]
|
||
s.loop_end = struct.unpack_from('<I', data, ptr+0x38)[0]
|
||
s.c5_speed = struct.unpack_from('<I', data, ptr+0x3C)[0] or 8363
|
||
s.sus_beg = struct.unpack_from('<I', data, ptr+0x40)[0]
|
||
s.sus_end = struct.unpack_from('<I', data, ptr+0x44)[0]
|
||
s.smp_point = struct.unpack_from('<I', data, ptr+0x48)[0]
|
||
|
||
s.has_loop = bool(s.flags & IT_SMP_LOOP)
|
||
s.is_16bit = bool(s.flags & IT_SMP_16BIT)
|
||
s.is_stereo = bool(s.flags & IT_SMP_STEREO)
|
||
s.is_compressed = bool(s.flags & IT_SMP_COMPRESSED)
|
||
s.is_signed = bool(s.cvt & 0x01)
|
||
s.sample_data = b''
|
||
|
||
has_data = bool(s.flags & IT_SMP_ASSOC) and s.length > 0
|
||
if has_data:
|
||
if s.is_compressed:
|
||
if not decompress:
|
||
vprint(f" warning: '{s.name}' is IT2.14 compressed, --no-decompress → silent")
|
||
else:
|
||
try:
|
||
is_it215 = bool(s.cvt & 0x04)
|
||
raw = it214_decompress(data, s.smp_point, s.length,
|
||
s.is_16bit, is_it215)
|
||
s.sample_data = normalise_sample(raw, True,
|
||
s.is_16bit, s.is_stereo, s.name)
|
||
s.length = len(s.sample_data)
|
||
s.loop_beg = min(s.loop_beg, s.length)
|
||
s.loop_end = min(s.loop_end, s.length)
|
||
except Exception as e:
|
||
vprint(f" warning: '{s.name}' decompression failed ({e}), silent")
|
||
else:
|
||
byte_len = s.length * (2 if s.is_16bit else 1) * (2 if s.is_stereo else 1)
|
||
if s.smp_point + byte_len > len(data):
|
||
vprint(f" warning: '{s.name}' sample data out of range, zeroing")
|
||
s.sample_data = bytes(min(s.length, 256))
|
||
else:
|
||
raw = data[s.smp_point : s.smp_point + byte_len]
|
||
s.sample_data = normalise_sample(raw, s.is_signed,
|
||
s.is_16bit, s.is_stereo, s.name)
|
||
s.length = len(s.sample_data)
|
||
s.loop_beg = min(s.loop_beg, s.length)
|
||
s.loop_end = min(s.loop_end, s.length)
|
||
samples.append(s)
|
||
return samples
|
||
|
||
|
||
# ── Pitch / filter envelope baker ─────────────────────────────────────────────
|
||
# IT instruments carry a third envelope (pitch or filter, distinguished by
|
||
# flag bit 7) that has no Taud equivalent. We render its effect onto a
|
||
# per-instrument copy of the canonical sample so the instrument can play
|
||
# through Taud's normal sample path with the modulation already baked in.
|
||
#
|
||
# Caveat: the baked envelope's time axis is locked to the sample's playback
|
||
# rate, so playing the instrument an octave up advances the envelope twice
|
||
# as fast (IT's envelope is wall-clock-time, regardless of note pitch). For
|
||
# typical drum/lead use cases this is musically acceptable.
|
||
|
||
def _clone_sample(src: 'ITSample') -> 'ITSample':
|
||
dst = ITSample()
|
||
for slot in ITSample.__slots__:
|
||
setattr(dst, slot, getattr(src, slot))
|
||
dst.sample_data = bytes(src.sample_data)
|
||
return dst
|
||
|
||
|
||
def _plan_baked_length(src: 'ITSample', nodes: list, flags: dict,
|
||
ticks_per_sec: float) -> tuple:
|
||
"""Determine baked output length and loop boundaries.
|
||
|
||
Non-looped src → (src.length, 0, 0).
|
||
Looped src → (entry + N×loop_len, entry, entry + N×loop_len),
|
||
with N the smallest integer that covers the envelope's
|
||
active duration in seconds. N is clamped to [1, 16].
|
||
"""
|
||
if not nodes:
|
||
return src.length, 0, 0
|
||
last_tick = nodes[-1][1]
|
||
env_dur_sec = last_tick / ticks_per_sec if ticks_per_sec > 0 else 0.0
|
||
env_dur_samples = int(env_dur_sec * src.c5_speed)
|
||
|
||
if not src.has_loop or src.loop_end <= src.loop_beg:
|
||
return src.length, 0, 0
|
||
|
||
entry = max(0, src.loop_beg)
|
||
loop_len = src.loop_end - src.loop_beg
|
||
if loop_len <= 0:
|
||
return src.length, 0, 0
|
||
|
||
samples_needed = max(0, env_dur_samples - entry)
|
||
n = max(1, (samples_needed + loop_len - 1) // loop_len)
|
||
n = min(n, 16)
|
||
out_len = entry + n * loop_len
|
||
return out_len, entry, out_len
|
||
|
||
|
||
def _read_src_sample(sd: bytes, pos: float, src: 'ITSample') -> float:
|
||
"""Linear-interpolated read of `sd` (unsigned u8 PCM) at fractional `pos`,
|
||
returning a signed float in roughly [-128, +127]. Honours `src.has_loop`
|
||
by wrapping into [loop_beg, loop_end) once `pos >= loop_end`. Past the
|
||
end of a non-looped sample, returns 0.0 (silence)."""
|
||
if not sd:
|
||
return 0.0
|
||
if src.has_loop and src.loop_end > src.loop_beg:
|
||
if pos >= src.loop_end:
|
||
span = src.loop_end - src.loop_beg
|
||
pos = src.loop_beg + ((pos - src.loop_beg) % span)
|
||
if pos < 0.0:
|
||
return 0.0
|
||
if pos >= len(sd) - 1:
|
||
if pos >= len(sd):
|
||
return 0.0
|
||
return float(sd[len(sd) - 1] - 128)
|
||
i0 = int(pos)
|
||
frac = pos - i0
|
||
a = sd[i0] - 128
|
||
b = sd[i0 + 1] - 128
|
||
return a + (b - a) * frac
|
||
|
||
|
||
def _bake_pitch_envelope(src: 'ITSample', nodes: list, flags: dict,
|
||
ticks_per_sec: float) -> 'ITSample':
|
||
if not src.sample_data:
|
||
return src
|
||
out_len, lb, le = _plan_baked_length(src, nodes, flags, ticks_per_sec)
|
||
tick_per_sample = ticks_per_sec / max(1, src.c5_speed)
|
||
sd = src.sample_data
|
||
out = bytearray(out_len)
|
||
read_pos = 0.0
|
||
t_tick = 0.0
|
||
for i in range(out_len):
|
||
env_v = _env_value_at(t_tick, nodes, flags)
|
||
rate = 2.0 ** (env_v / 12.0)
|
||
v = _read_src_sample(sd, read_pos, src)
|
||
b = int(round(v)) + 128
|
||
if b < 0: b = 0
|
||
elif b > 255: b = 255
|
||
out[i] = b
|
||
read_pos += rate
|
||
t_tick += tick_per_sample
|
||
|
||
dst = _clone_sample(src)
|
||
dst.sample_data = bytes(out)
|
||
dst.length = out_len
|
||
dst.loop_beg = lb
|
||
dst.loop_end = le
|
||
if lb < le:
|
||
dst.has_loop = True
|
||
dst.flags = (dst.flags | IT_SMP_LOOP) & ~IT_SMP_PINGPONG
|
||
else:
|
||
dst.has_loop = False
|
||
dst.flags = dst.flags & ~(IT_SMP_LOOP | IT_SMP_PINGPONG)
|
||
return dst
|
||
|
||
|
||
def _bake_filter_envelope(src: 'ITSample', nodes: list, flags: dict,
|
||
ticks_per_sec: float, ifr: int) -> 'ITSample':
|
||
"""Time-varying 2-pole resonant low-pass biquad (RBJ cookbook).
|
||
Approximates IT's filter; not bit-exact to IT's Pentium routine."""
|
||
if not src.sample_data:
|
||
return src
|
||
out_len, lb, le = _plan_baked_length(src, nodes, flags, ticks_per_sec)
|
||
tick_per_sample = ticks_per_sec / max(1, src.c5_speed)
|
||
sr = max(1, src.c5_speed)
|
||
nyq = sr * 0.5 - 1.0
|
||
|
||
# Resonance (0..127) → Q ∈ [0.5, 6.0]
|
||
Q = 0.5 + (max(0, min(ifr, 127)) / 127.0) * 5.5
|
||
|
||
sd = src.sample_data
|
||
out = bytearray(out_len)
|
||
x1 = x2 = 0.0
|
||
y1 = y2 = 0.0
|
||
t_tick = 0.0
|
||
src_len = len(sd)
|
||
for i in range(out_len):
|
||
env_v = _env_value_at(t_tick, nodes, flags)
|
||
# Map env value -32..+32 to cutoff: 110 Hz (closed) to ~28 kHz (open)
|
||
cutoff = 110.0 * (2.0 ** ((env_v + 32.0) * 0.125))
|
||
if cutoff > nyq: cutoff = nyq
|
||
if cutoff < 1.0: cutoff = 1.0
|
||
w0 = 2.0 * math.pi * cutoff / sr
|
||
cosw = math.cos(w0)
|
||
sinw = math.sin(w0)
|
||
alpha = sinw / (2.0 * Q)
|
||
b0 = (1.0 - cosw) * 0.5
|
||
b1 = 1.0 - cosw
|
||
b2 = b0
|
||
a0 = 1.0 + alpha
|
||
a1 = -2.0 * cosw
|
||
a2 = 1.0 - alpha
|
||
|
||
# Read source with looping
|
||
in_pos = i
|
||
if src.has_loop and src.loop_end > src.loop_beg and in_pos >= src.loop_end:
|
||
span = src.loop_end - src.loop_beg
|
||
in_pos = src.loop_beg + ((in_pos - src.loop_beg) % span)
|
||
if 0 <= in_pos < src_len:
|
||
x0 = float(sd[in_pos] - 128)
|
||
else:
|
||
x0 = 0.0
|
||
|
||
y0 = (b0 * x0 + b1 * x1 + b2 * x2 - a1 * y1 - a2 * y2) / a0
|
||
x2 = x1; x1 = x0
|
||
y2 = y1; y1 = y0
|
||
|
||
b = int(round(y0)) + 128
|
||
if b < 0: b = 0
|
||
elif b > 255: b = 255
|
||
out[i] = b
|
||
t_tick += tick_per_sample
|
||
|
||
dst = _clone_sample(src)
|
||
dst.sample_data = bytes(out)
|
||
dst.length = out_len
|
||
dst.loop_beg = lb
|
||
dst.loop_end = le
|
||
if lb < le:
|
||
dst.has_loop = True
|
||
dst.flags = (dst.flags | IT_SMP_LOOP) & ~IT_SMP_PINGPONG
|
||
else:
|
||
dst.has_loop = False
|
||
dst.flags = dst.flags & ~(IT_SMP_LOOP | IT_SMP_PINGPONG)
|
||
return dst
|
||
|
||
|
||
def _bake_pf_envelope(src: 'ITSample', inst, ticks_per_sec: float) -> 'ITSample':
|
||
if not inst.pf_nodes or not inst.pf_flags or not inst.pf_flags.get('enabled'):
|
||
return src
|
||
if inst.pf_flags['is_filter']:
|
||
return _bake_filter_envelope(src, inst.pf_nodes, inst.pf_flags,
|
||
ticks_per_sec, inst.ifr)
|
||
return _bake_pitch_envelope(src, inst.pf_nodes, inst.pf_flags, ticks_per_sec)
|
||
|
||
|
||
# ── IT instrument parser ──────────────────────────────────────────────────────
|
||
|
||
class ITInstrument:
|
||
__slots__ = ('name', 'dfp', 'gv', 'canonical_sample', 'canonical_volume',
|
||
'vol_envelope', 'pan_envelope', 'vol_env_sustain', 'pan_env_sustain',
|
||
'pf_nodes', 'pf_flags', 'ifc', 'ifr')
|
||
# vol_envelope / pan_envelope: list of 12 (value, minifloat_idx) tuples, or None
|
||
# vol_env_sustain / pan_env_sustain: int (0 = disabled, else (end<<3)|start)
|
||
# pf_nodes: raw list of (int8 value, uint16 tick) tuples (up to 25), or None
|
||
# pf_flags: dict {enabled, has_env_loop, has_sus_loop, lpb, lpe, slb, sle,
|
||
# is_filter, carry}, or None
|
||
# ifc / ifr: initial filter cutoff / resonance (0..127, or 0 if not set)
|
||
|
||
def parse_instruments(data: bytes, h: ITHeader) -> list:
|
||
insts = []
|
||
for i, ptr in enumerate(h.ins_ptrs):
|
||
if ptr == 0 or ptr + 0x48 > len(data):
|
||
insts.append(None); continue
|
||
if data[ptr:ptr+4] != IT_INST_MAGIC:
|
||
insts.append(None); continue
|
||
|
||
inst = ITInstrument()
|
||
inst.name = data[ptr+0x20:ptr+0x3A].rstrip(b'\x00').decode('latin-1', errors='replace')
|
||
inst.gv = data[ptr+0x18]
|
||
dfp_raw = data[ptr+0x19]
|
||
inst.dfp = dfp_raw & 0x7F if (dfp_raw & 0x80) else None # None = don't use
|
||
|
||
# Keyboard table: 240 bytes at ptr+0x44, 120 pairs of (note, sample_1based)
|
||
keyboard = []
|
||
for n in range(120):
|
||
kb_note = data[ptr + 0x44 + n*2]
|
||
kb_smp = data[ptr + 0x44 + n*2 + 1]
|
||
keyboard.append(kb_smp) # 0 = no sample
|
||
|
||
# Pick C-5 (note 60) sample; fall back to most-frequent non-zero
|
||
c5_smp = keyboard[60] if 60 < len(keyboard) else 0
|
||
if c5_smp == 0:
|
||
from collections import Counter
|
||
freq = Counter(s for s in keyboard if s != 0)
|
||
c5_smp = freq.most_common(1)[0][0] if freq else 0
|
||
|
||
inst.canonical_sample = c5_smp # 1-based sample index, 0 = none
|
||
inst.canonical_volume = min(inst.gv, 64)
|
||
|
||
# Initial filter cutoff/resonance (high bit = enabled, low 7 bits = value)
|
||
ifc_raw = data[ptr + 0x39]
|
||
ifr_raw = data[ptr + 0x3A]
|
||
inst.ifc = ifc_raw & 0x7F if (ifc_raw & 0x80) else 0
|
||
inst.ifr = ifr_raw & 0x7F if (ifr_raw & 0x80) else 0
|
||
|
||
# Parse IT envelopes (new-format only, ≥cmwt 0x200)
|
||
# Vol envelope at ptr+0x130; pan envelope at ptr+0x182; pf envelope at ptr+0x1D4
|
||
ticks_per_sec = max(h.initial_tempo * 2.0 / 5.0, 1.0) # tick rate = bpm×2/5 (50 Hz at 125 BPM); speed is ticks-per-row, irrelevant here
|
||
inst.vol_envelope, inst.vol_env_sustain = _parse_it_envelope(
|
||
data, ptr + 0x130, False, ticks_per_sec)
|
||
inst.pan_envelope, inst.pan_env_sustain = _parse_it_envelope(
|
||
data, ptr + 0x182, True, ticks_per_sec)
|
||
inst.pf_nodes, inst.pf_flags = _parse_it_pf_envelope_raw(
|
||
data, ptr + 0x1D4)
|
||
insts.append(inst)
|
||
return insts
|
||
|
||
|
||
def _parse_it_envelope(data: bytes, env_ptr: int, is_pan: bool,
|
||
ticks_per_sec: float) -> tuple:
|
||
"""Parse one IT envelope block into 12 Taud (value, minifloat_idx) points.
|
||
|
||
Returns (points_list, sustain_byte) where points_list is a list of
|
||
12 (value, minifloat_idx) tuples, or None if envelope not enabled.
|
||
sustain_byte: bit7=enabled (u), bit6=sustain (t: 1=breaks on key-off,
|
||
0=loops forever), bits[5:3]=end_idx, bits[2:0]=start_idx; 0=disabled.
|
||
Note: sustain byte still uses 3-bit indices (0..7), so loop nodes
|
||
referencing indices 8..11 cannot be encoded and fall back to no loop.
|
||
|
||
IT has two loop types: envelope loop (continues forever) and sustain loop
|
||
(breaks on key-off). Taud distinguishes them via the 't' flag. Priority
|
||
when both exist: sustain (because IT plays sustain while held, then env
|
||
loop after release; Taud can only express one).
|
||
"""
|
||
if env_ptr + 82 > len(data):
|
||
return None, 0
|
||
flags = data[env_ptr]
|
||
if not (flags & 0x01):
|
||
return None, 0 # envelope not enabled
|
||
|
||
num_nodes = max(1, min(data[env_ptr + 1], 25))
|
||
it_lpb = data[env_ptr + 2] # envelope loop begin node
|
||
it_lpe = data[env_ptr + 3] # envelope loop end node
|
||
it_slb = data[env_ptr + 4] # sustain loop begin node
|
||
it_sle = data[env_ptr + 5] # sustain loop end node
|
||
has_env_loop = bool(flags & 0x02)
|
||
has_sus_loop = bool(flags & 0x04)
|
||
|
||
# Choose which IT loop to map to Taud (priority: sus > env). The 't' flag
|
||
# distinguishes them: t=1 for sustain (breaks on key-off), t=0 for env loop.
|
||
if has_sus_loop:
|
||
use_lb, use_le = it_slb, it_sle
|
||
has_loop = True
|
||
is_sustain = True
|
||
elif has_env_loop:
|
||
use_lb, use_le = it_lpb, it_lpe
|
||
has_loop = True
|
||
is_sustain = False
|
||
else:
|
||
use_lb = use_le = -1
|
||
has_loop = False
|
||
is_sustain = False
|
||
|
||
# Read IT nodes: (int8 value, uint16 tick_pos LE)
|
||
nodes = []
|
||
for n in range(num_nodes):
|
||
nptr = env_ptr + 6 + n * 3
|
||
if nptr + 2 >= len(data):
|
||
break
|
||
val = struct.unpack_from('b', data, nptr)[0] # signed: vol 0..64, pan -32..32
|
||
tick = struct.unpack_from('<H', data, nptr + 1)[0]
|
||
nodes.append((val, tick))
|
||
|
||
if not nodes:
|
||
return None, 0
|
||
|
||
# Decimate to 12 nodes if needed, preserving first/last
|
||
decimated = len(nodes) > 12
|
||
if not decimated:
|
||
selected = nodes[:]
|
||
# Sustain byte encodes 3-bit indices; loop nodes ≥8 cannot be referenced.
|
||
if has_loop and use_lb < min(len(selected), 8) and use_le < min(len(selected), 8):
|
||
taud_slb, taud_sle = use_lb, use_le
|
||
else:
|
||
taud_slb = taud_sle = -1
|
||
if has_loop and (use_lb >= 8 or use_le >= 8):
|
||
vprint(f" loop indices ≥8 cannot be encoded in 3-bit sustain field")
|
||
else:
|
||
selected = [nodes[round(k * (len(nodes) - 1) / 11)] for k in range(12)]
|
||
taud_slb = taud_sle = -1 # loop indices lost in decimation
|
||
if has_loop:
|
||
vprint(f" loop indices lost due to decimation ({len(nodes)} nodes → 12)")
|
||
|
||
# Build 12 Taud envelope points with delta-time minifloats
|
||
points = []
|
||
for k in range(12):
|
||
if k < len(selected):
|
||
val, tick = selected[k]
|
||
if is_pan:
|
||
taud_val = min(255, max(0, round((val + 32) * 255 / 64)))
|
||
else:
|
||
taud_val = min(63, max(0, round(val * 63 / 64)))
|
||
if k < len(selected) - 1:
|
||
_, next_tick = selected[k + 1]
|
||
delta_sec = max(0.0, (next_tick - tick) / ticks_per_sec)
|
||
mf_idx = _nearest_minifloat(delta_sec)
|
||
else:
|
||
mf_idx = 0 # last real node: hold
|
||
else:
|
||
# Pad: copy last real value, offset=0 (hold)
|
||
taud_val = points[-1][0] if points else (128 if is_pan else 63)
|
||
mf_idx = 0
|
||
points.append((taud_val, mf_idx))
|
||
|
||
# Build sustain byte: bit7=enable (u), bit6=sustain (t), bits[5:3]=end,
|
||
# bits[2:0]=start. 0=disabled. t=1 → breaks on key-off (IT sustain loop);
|
||
# t=0 → loops forever (IT envelope loop).
|
||
if taud_slb >= 0 and taud_sle >= 0:
|
||
t_bit = 0x40 if is_sustain else 0x00
|
||
sus_byte = 0x80 | t_bit | ((taud_sle & 7) << 3) | (taud_slb & 7)
|
||
else:
|
||
sus_byte = 0
|
||
|
||
return points, sus_byte
|
||
|
||
|
||
def _parse_it_pf_envelope_raw(data: bytes, env_ptr: int) -> tuple:
|
||
"""Parse the IT pitch/filter envelope keeping all nodes (no decimation).
|
||
|
||
Returns (nodes, flags) where:
|
||
nodes: list of (int8 value, uint16 tick) tuples (up to 25 entries),
|
||
values in -32..+32 semitones (pitch) or filter modulation units.
|
||
flags: dict {enabled, has_env_loop, has_sus_loop, lpb, lpe, slb, sle,
|
||
is_filter, carry}
|
||
Returns (None, None) if the envelope is not enabled or out of range.
|
||
"""
|
||
if env_ptr + 82 > len(data):
|
||
return None, None
|
||
flags_byte = data[env_ptr]
|
||
if not (flags_byte & 0x01):
|
||
return None, None
|
||
num_nodes = max(1, min(data[env_ptr + 1], 25))
|
||
flags = {
|
||
'enabled': True,
|
||
'has_env_loop': bool(flags_byte & 0x02),
|
||
'has_sus_loop': bool(flags_byte & 0x04),
|
||
'carry': bool(flags_byte & 0x08),
|
||
'is_filter': bool(flags_byte & 0x80),
|
||
'lpb': data[env_ptr + 2],
|
||
'lpe': data[env_ptr + 3],
|
||
'slb': data[env_ptr + 4],
|
||
'sle': data[env_ptr + 5],
|
||
}
|
||
nodes = []
|
||
for n in range(num_nodes):
|
||
nptr = env_ptr + 6 + n * 3
|
||
if nptr + 2 >= len(data):
|
||
break
|
||
val = struct.unpack_from('b', data, nptr)[0]
|
||
tick = struct.unpack_from('<H', data, nptr + 1)[0]
|
||
nodes.append((val, tick))
|
||
if not nodes:
|
||
return None, None
|
||
# Clamp loop indices into valid range
|
||
for k in ('lpb', 'lpe', 'slb', 'sle'):
|
||
flags[k] = min(flags[k], len(nodes) - 1)
|
||
return nodes, flags
|
||
|
||
|
||
def _env_value_at(t_tick: float, nodes: list, flags: dict) -> float:
|
||
"""Return interpolated envelope value at time `t_tick` (IT envelope ticks).
|
||
|
||
Honours env-loop wrap. Sus-loop is treated as 'play through once then hold
|
||
last value' (no key-off model). Returns nodes[0].value if t_tick is before
|
||
the first node, nodes[-1].value if past the last node and no env loop.
|
||
"""
|
||
if not nodes:
|
||
return 0.0
|
||
first_tick = nodes[0][1]
|
||
if t_tick <= first_tick:
|
||
return float(nodes[0][0])
|
||
|
||
if flags['has_env_loop']:
|
||
lpb_tick = nodes[flags['lpb']][1]
|
||
lpe_tick = nodes[flags['lpe']][1]
|
||
loop_span = lpe_tick - lpb_tick
|
||
if loop_span > 0 and t_tick >= lpe_tick:
|
||
t_tick = lpb_tick + ((t_tick - lpb_tick) % loop_span)
|
||
elif loop_span <= 0 and t_tick >= lpe_tick:
|
||
return float(nodes[flags['lpe']][0])
|
||
|
||
last_tick = nodes[-1][1]
|
||
if t_tick >= last_tick:
|
||
return float(nodes[-1][0])
|
||
|
||
# Linear interpolate between bracketing nodes
|
||
for k in range(len(nodes) - 1):
|
||
a_val, a_t = nodes[k]
|
||
b_val, b_t = nodes[k + 1]
|
||
if a_t <= t_tick <= b_t:
|
||
if b_t == a_t:
|
||
return float(a_val)
|
||
frac = (t_tick - a_t) / (b_t - a_t)
|
||
return a_val + (b_val - a_val) * frac
|
||
return float(nodes[-1][0])
|
||
|
||
|
||
# ── IT pattern parser ─────────────────────────────────────────────────────────
|
||
|
||
class ITRow:
|
||
__slots__ = ('note', 'inst', 'vol', 'effect', 'effect_arg', 'volcol',
|
||
'pan_set', 'aux_effect')
|
||
def __init__(self):
|
||
self.note = -1 # -1=empty, 0-119=pitch, IT_NOTE_*
|
||
self.inst = 0 # 1-based
|
||
self.vol = -1 # -1=not set
|
||
self.effect = 0
|
||
self.effect_arg = 0
|
||
self.volcol = -1 # raw IT vol-col byte, -1 = not set
|
||
self.pan_set = None # 0..63 from vol-col, or None
|
||
self.aux_effect = None # (cmd,arg) from vol-col, or None
|
||
|
||
def _parse_one_pattern(data: bytes, ptr: int) -> tuple:
|
||
"""Returns (grid: list[64_channels][rows], row_count: int)."""
|
||
if ptr == 0 or ptr + 8 > len(data):
|
||
return [[ITRow() for _ in range(PATTERN_ROWS)] for _ in range(64)], PATTERN_ROWS
|
||
|
||
data_len = struct.unpack_from('<H', data, ptr)[0]
|
||
row_count = struct.unpack_from('<H', data, ptr+2)[0]
|
||
end = ptr + 8 + data_len
|
||
pos = ptr + 8
|
||
|
||
grid = [[ITRow() for _ in range(row_count)] for _ in range(64)]
|
||
|
||
# Per-channel last-data memory
|
||
last_mask = [0] * 64
|
||
last_note = [0] * 64
|
||
last_inst = [0] * 64
|
||
last_volcol = [0] * 64
|
||
last_cmd = [0] * 64
|
||
last_arg = [0] * 64
|
||
|
||
row = 0
|
||
while row < row_count and pos < end:
|
||
chan_byte = data[pos]; pos += 1
|
||
if chan_byte == 0:
|
||
row += 1
|
||
continue
|
||
ch = (chan_byte - 1) & 63
|
||
if chan_byte & 0x80:
|
||
if pos >= end: break
|
||
last_mask[ch] = data[pos]; pos += 1
|
||
mask = last_mask[ch]
|
||
|
||
cell = grid[ch][row]
|
||
|
||
if mask & 0x01:
|
||
last_note[ch] = data[pos]; pos += 1
|
||
if mask & 0x02:
|
||
last_inst[ch] = data[pos]; pos += 1
|
||
if mask & 0x04:
|
||
last_volcol[ch] = data[pos]; pos += 1
|
||
if mask & 0x08:
|
||
last_cmd[ch] = data[pos]; pos += 1
|
||
last_arg[ch] = data[pos]; pos += 1
|
||
|
||
if mask & 0x11: cell.note = last_note[ch]
|
||
if mask & 0x22: cell.inst = last_inst[ch]
|
||
if mask & 0x44: cell.volcol = last_volcol[ch]
|
||
if mask & 0x88:
|
||
cell.effect = last_cmd[ch]
|
||
cell.effect_arg = last_arg[ch]
|
||
|
||
return grid, row_count
|
||
|
||
def parse_patterns(data: bytes, h: ITHeader) -> list:
|
||
"""Returns list of (grid, row_count) tuples."""
|
||
patterns = []
|
||
for ptr in h.pat_ptrs:
|
||
grid, rows = _parse_one_pattern(data, ptr)
|
||
patterns.append((grid, rows))
|
||
return patterns
|
||
|
||
|
||
# ── Note encoding (IT linear 0-119 → Taud pitch units) ───────────────────────
|
||
|
||
def encode_note_it(it_note: int) -> int:
|
||
if it_note == IT_NOTE_OFF or it_note == IT_NOTE_FADE:
|
||
return NOTE_KEYOFF
|
||
if it_note == IT_NOTE_CUT:
|
||
return NOTE_CUT
|
||
if 0 <= it_note <= 119:
|
||
# IT middle C is C-5 (note 60); Taud reference is C-3 (TAUD_C3 = 0x4000).
|
||
# IT C-5 anchors to Taud C-3, so offset = it_note - 60.
|
||
semis = it_note - 60
|
||
val = round(TAUD_C3 + semis * 4096 / 12)
|
||
return max(1, min(0xFFFD, val))
|
||
return NOTE_NOP
|
||
|
||
|
||
# ── Vol-column decoder ────────────────────────────────────────────────────────
|
||
|
||
def decode_volcol(vc: int):
|
||
"""Return (vol_sel, vol_value, pan_set, aux_effect) or None for each field."""
|
||
if vc < 0: # not set
|
||
return SEL_FINE, 0, None, None
|
||
if vc <= VC_VOL_HI:
|
||
return SEL_SET, min(vc, 0x3F), None, None
|
||
if VC_FVUP_LO <= vc <= VC_FVUP_HI:
|
||
mag = vc - VC_FVUP_LO + 1 # 1..10
|
||
return SEL_FINE, (mag & 0x1F) | 0x20, None, None # fine up
|
||
if VC_FVDN_LO <= vc <= VC_FVDN_HI:
|
||
mag = vc - VC_FVDN_LO + 1
|
||
return SEL_FINE, mag & 0x1F, None, None # fine down
|
||
if VC_VUP_LO <= vc <= VC_VUP_HI:
|
||
return SEL_UP, vc - VC_VUP_LO + 1, None, None
|
||
if VC_VDN_LO <= vc <= VC_VDN_HI:
|
||
return SEL_DOWN, vc - VC_VDN_LO + 1, None, None
|
||
if VC_PDN_LO <= vc <= VC_PDN_HI:
|
||
# Pitch slide down: each unit = 4 ST3 coarse units (1/16 semitone each)
|
||
units = (vc - VC_PDN_LO + 1) * 4
|
||
return SEL_FINE, 0, None, (EFF_E, units & 0xFF)
|
||
if VC_PUP_LO <= vc <= VC_PUP_HI:
|
||
units = (vc - VC_PUP_LO + 1) * 4
|
||
return SEL_FINE, 0, None, (EFF_F, units & 0xFF)
|
||
if VC_PAN_LO <= vc <= VC_PAN_HI:
|
||
pan64 = vc - VC_PAN_LO # 0..64
|
||
pan6 = min(0x3F, round(pan64 * 63 / 64))
|
||
return SEL_FINE, 0, pan6, None
|
||
if VC_TPORTA_LO <= vc <= VC_TPORTA_HI:
|
||
spd = VC_TPORTA_TABLE[vc - VC_TPORTA_LO]
|
||
return SEL_FINE, 0, None, (EFF_G, spd & 0xFF)
|
||
if VC_VIB_LO <= vc <= VC_VIB_HI:
|
||
depth = vc - VC_VIB_LO + 1 # 1..10
|
||
return SEL_FINE, 0, None, (EFF_H, depth & 0x0F)
|
||
return SEL_FINE, 0, None, None
|
||
|
||
|
||
# ── Effect translator ─────────────────────────────────────────────────────────
|
||
|
||
def encode_effect_it(cmd: int, arg: int, ch: int = 0, row: int = 0) -> tuple:
|
||
"""Return (taud_op, taud_arg16, vol_override, pan_override).
|
||
|
||
Differs from s3m2taud.encode_effect in:
|
||
- Cxx: binary row number, not BCD
|
||
- V: IT global vol 0-128 scaled ×2
|
||
- X: IT full 8-bit pan → 6-bit
|
||
- S6x, S7x, SAx, SFx handled (mostly dropped)
|
||
"""
|
||
if cmd == 0:
|
||
return (TOP_NONE, 0, None, None)
|
||
|
||
if cmd == EFF_A:
|
||
if arg == 0:
|
||
return (TOP_NONE, 0, None, None)
|
||
return (TOP_A, (arg & 0xFF) << 8, None, None)
|
||
|
||
if cmd == EFF_B:
|
||
return (TOP_B, arg & 0xFF, None, None)
|
||
|
||
if cmd == EFF_C:
|
||
# IT stores binary (not BCD like ST3)
|
||
row_num = arg & 0xFF
|
||
if row_num >= PATTERN_ROWS:
|
||
row_num = 0
|
||
return (TOP_C, row_num & 0xFF, None, None)
|
||
|
||
if cmd == EFF_D:
|
||
return (TOP_D, (arg & 0xFF) << 8, None, None)
|
||
|
||
if cmd in (EFF_E, EFF_F):
|
||
op = TOP_E if cmd == EFF_E else TOP_F
|
||
hi = (arg >> 4) & 0xF
|
||
lo = arg & 0xF
|
||
if hi in (0xE, 0xF) and lo > 0:
|
||
return (op, 0xF000 | (round(lo * 16 / 3) & 0xFFF), None, None)
|
||
return (op, round(arg * 64 / 3) & 0xFFFF, None, None)
|
||
|
||
if cmd == EFF_G:
|
||
return (TOP_G, round(arg * 64 / 3) & 0xFFFF, None, None)
|
||
|
||
if cmd in (EFF_H, EFF_I, EFF_R, EFF_U):
|
||
op = {EFF_H: TOP_H, EFF_I: TOP_I, EFF_R: TOP_R, EFF_U: TOP_U}[cmd]
|
||
hi = (arg >> 4) & 0xF
|
||
lo = arg & 0xF
|
||
return (op, ((hi * 0x11) << 8) | (lo * 0x11), None, None)
|
||
|
||
if cmd == EFF_J:
|
||
hi_semi = (arg >> 4) & 0xF
|
||
lo_semi = arg & 0xF
|
||
return (TOP_J, (J_SEMI_TABLE[hi_semi] << 8) | J_SEMI_TABLE[lo_semi], None, None)
|
||
|
||
if cmd == EFF_K:
|
||
return (TOP_H, 0x0000, d_arg_to_col(arg), None)
|
||
|
||
if cmd == EFF_L:
|
||
return (TOP_G, 0x0000, d_arg_to_col(arg), None)
|
||
|
||
if cmd == EFF_M:
|
||
return (TOP_NONE, 0, (SEL_SET, min(arg, 0x3F)), None)
|
||
|
||
if cmd == EFF_N:
|
||
return (TOP_NONE, 0, d_arg_to_col(arg), None)
|
||
|
||
if cmd == EFF_O:
|
||
return (TOP_O, (arg & 0xFF) << 8, None, None)
|
||
|
||
if cmd == EFF_P:
|
||
return (TOP_NONE, 0, None, d_arg_to_col(arg))
|
||
|
||
if cmd == EFF_Q:
|
||
return (TOP_Q, (arg & 0xFF) << 8, None, None)
|
||
|
||
if cmd == EFF_S:
|
||
sub = (arg >> 4) & 0xF
|
||
val = arg & 0xF
|
||
if sub in (0x1, 0x2, 0x3, 0x4, 0xB, 0xC, 0xD, 0xE):
|
||
return (TOP_S, (sub << 12) | (val << 8), None, None)
|
||
if sub == 0x5:
|
||
return (TOP_S, 0x5000 | (val << 8), None, None)
|
||
if sub == 0x8:
|
||
# IT S8x: 4-bit → nibble-repeat into 8-bit SEL_SET pan
|
||
pan8 = (val << 4) | val
|
||
pan6 = min(0x3F, round(pan8 * 63 / 255))
|
||
return (TOP_NONE, 0, None, (SEL_SET, pan6))
|
||
if sub == 0x6:
|
||
vprint(f" dropped S6{val:X} (tick delay) at ch{ch} row{row}")
|
||
return (TOP_NONE, 0, None, None)
|
||
if sub == 0x7:
|
||
return (TOP_NONE, 0, None, None) # NNA/envelope — drop silently
|
||
if sub == 0x9:
|
||
return (TOP_NONE, 0, None, None) # sound control — drop silently
|
||
if sub == 0xA:
|
||
vprint(f" dropped SA{val:X} (high offset) at ch{ch} row{row}")
|
||
return (TOP_NONE, 0, None, None)
|
||
if sub == 0xF:
|
||
vprint(f" dropped SF{val:X} (MIDI macro) at ch{ch} row{row}")
|
||
return (TOP_NONE, 0, None, None)
|
||
return (TOP_NONE, 0, None, None)
|
||
|
||
if cmd == EFF_T:
|
||
if arg >= 0x20:
|
||
return (TOP_T, ((arg - 0x18) & 0xFF) << 8, None, None)
|
||
return (TOP_T, arg & 0xFF, None, None)
|
||
|
||
if cmd == EFF_V:
|
||
# IT global vol is 0-128; Taud uses 0-255 → ×2
|
||
taud_v = min(arg * 2, 0xFF)
|
||
return (TOP_V, (taud_v & 0xFF) << 8, None, None)
|
||
|
||
if cmd == EFF_W:
|
||
vprint(f" dropped W{arg:02X} (global vol slide) at ch{ch} row{row}")
|
||
return (TOP_NONE, 0, None, None)
|
||
|
||
if cmd == EFF_X:
|
||
# IT X is full 8-bit pan (0=left, 255=right; 128=centre in OpenMPT but
|
||
# IT spec says 0-255 maps to full left-right)
|
||
pan6 = min(0x3F, round(arg * 63 / 255))
|
||
return (TOP_NONE, 0, None, (SEL_SET, pan6))
|
||
|
||
if cmd == EFF_Y:
|
||
hi = (arg >> 4) & 0xF
|
||
lo = arg & 0xF
|
||
return (TOP_Y, ((hi * 0x11) << 8) | (lo * 0x11), None, None)
|
||
|
||
if cmd == EFF_Z:
|
||
vprint(f" dropped Z{arg:02X} (MIDI macro) at ch{ch} row{row}")
|
||
return (TOP_NONE, 0, None, None)
|
||
|
||
return (TOP_NONE, 0, None, None)
|
||
|
||
|
||
# ── IT recall resolution ──────────────────────────────────────────────────────
|
||
|
||
def resolve_it_recalls(patterns_rows: list, order_list: list,
|
||
num_channels: int, link_gef: bool,
|
||
old_effects: bool = False,
|
||
initial_global_vol: int = 128) -> None:
|
||
"""Walk in order, resolve zero-arg recalls per-effect-per-channel.
|
||
|
||
IT effect memory groups:
|
||
- D / K / L: shared vol-slide cohort
|
||
- E / F (/ G when link_gef): shared pitch-slide cohort
|
||
- G: own slot (or part of EF cohort when link_gef)
|
||
- All others: private slots
|
||
|
||
old_effects=True (IT_FLAG_OLD_EFFECTS): E00/F00 are ST3-style stops —
|
||
they do NOT recall and are suppressed to TOP_NONE. All other effects
|
||
still recall normally even in old_effects mode.
|
||
|
||
V memory is primed with initial_global_vol so a song-leading V $0000
|
||
resolves to the header's global volume, not literal zero.
|
||
"""
|
||
# last_mem[ch][eff_key] = last_non_zero_arg
|
||
# eff_key: integer 1-26 for most effects; we merge cohorts by normalising.
|
||
last_mem = [{} for _ in range(num_channels)]
|
||
for ch in range(num_channels):
|
||
last_mem[ch][EFF_V] = initial_global_vol
|
||
|
||
# Effects that stop rather than recall when arg=0 in old_effects mode (ST3 compat).
|
||
# E/F: pitch slide stop. J: arpeggio stop (J00 = return to normal pitch in ST3).
|
||
OLD_EFF_STOPS = frozenset({EFF_E, EFF_F, EFF_J})
|
||
|
||
def cohort_key(cmd):
|
||
if cmd in (EFF_D, EFF_K, EFF_L):
|
||
return EFF_D # vol-slide cohort
|
||
if link_gef and cmd in (EFF_E, EFF_F, EFF_G):
|
||
return EFF_E # EFG cohort
|
||
if not link_gef and cmd in (EFF_E, EFF_F):
|
||
return EFF_E # EF cohort
|
||
return cmd
|
||
|
||
for order in order_list:
|
||
if order >= IT_ORD_END:
|
||
break
|
||
if order >= len(patterns_rows):
|
||
continue
|
||
grid, rows = patterns_rows[order]
|
||
for r in range(rows):
|
||
for ch in range(num_channels):
|
||
if ch >= len(grid):
|
||
continue
|
||
cell = grid[ch][r]
|
||
if cell.effect not in IT_MEM_EFFECTS:
|
||
continue
|
||
key = cohort_key(cell.effect)
|
||
if cell.effect_arg == 0:
|
||
if old_effects and cell.effect in OLD_EFF_STOPS:
|
||
# E00/F00 in old_effects = stop slide — suppress entirely.
|
||
# Taud's E $0000 also recalls, so convert to no-op here.
|
||
cell.effect = 0
|
||
else:
|
||
cell.effect_arg = last_mem[ch].get(key, 0)
|
||
else:
|
||
last_mem[ch][key] = cell.effect_arg
|
||
|
||
|
||
# ── Pattern row-chunk splitter ────────────────────────────────────────────────
|
||
|
||
def split_patterns(patterns_rows: list):
|
||
"""
|
||
Returns (chunks, chunk_map).
|
||
chunks: flat list of 64-row grids (list of 64 × 64-channel ITRow arrays)
|
||
chunk_map: list per source pattern of [chunk_idx_0, chunk_idx_1, ...]
|
||
"""
|
||
chunks = []
|
||
chunk_map = []
|
||
|
||
for pi, (grid, rows) in enumerate(patterns_rows):
|
||
if rows == 0:
|
||
chunk_map.append([])
|
||
continue
|
||
|
||
n_chunks = (rows + PATTERN_ROWS - 1) // PATTERN_ROWS
|
||
if n_chunks > 1:
|
||
vprint(f" pattern {pi}: {rows} rows → {n_chunks} chunks")
|
||
|
||
pat_chunks = []
|
||
for k in range(n_chunks):
|
||
r0 = k * PATTERN_ROWS
|
||
r1 = min(r0 + PATTERN_ROWS, rows)
|
||
# Build a 64-row grid for this chunk
|
||
chunk_grid = []
|
||
for ch in range(64):
|
||
ch_rows = []
|
||
src = grid[ch] if ch < len(grid) else []
|
||
for ri in range(PATTERN_ROWS):
|
||
sr = r0 + ri
|
||
if sr < r1 and ri < len(src):
|
||
ch_rows.append(src[sr])
|
||
else:
|
||
ch_rows.append(ITRow())
|
||
chunk_grid.append(ch_rows)
|
||
|
||
# If this is not the last chunk, add a C $0000 on ch0 row (r1-r0-1)
|
||
# to immediately break to next order (skip padding rows).
|
||
# Only needed when the last real row of this chunk is < 63.
|
||
if k < n_chunks - 1:
|
||
last_real = r1 - r0 - 1
|
||
pad_row = chunk_grid[0][last_real]
|
||
if pad_row.effect == 0:
|
||
pad_row.effect = EFF_C
|
||
pad_row.effect_arg = 0
|
||
elif rows < PATTERN_ROWS and n_chunks == 1:
|
||
# Single chunk, short pattern → break at last real row
|
||
last_real = rows - 1
|
||
if last_real < PATTERN_ROWS - 1:
|
||
pad_row = chunk_grid[0][last_real]
|
||
if pad_row.effect == 0:
|
||
pad_row.effect = EFF_C
|
||
pad_row.effect_arg = 0
|
||
|
||
idx = len(chunks)
|
||
chunks.append(chunk_grid)
|
||
pat_chunks.append(idx)
|
||
|
||
chunk_map.append(pat_chunks)
|
||
return chunks, chunk_map
|
||
|
||
|
||
def _remap_bc_effects(chunks: list, chunk_map: list,
|
||
order_list: list, it_ord_to_taud_cue: dict,
|
||
num_channels: int) -> None:
|
||
"""Rewrite B/C effects using remapped order indices.
|
||
|
||
B effects in all chunks are rewritten to point to the first chunk
|
||
of the target IT order. C effects in non-final chunks of a split
|
||
pattern get a co-row B to skip remaining chunks.
|
||
"""
|
||
# For each chunk, record which (it_pat, chunk_k, n_chunks) it came from.
|
||
# We build this from chunk_map.
|
||
chunk_info = {} # chunk_idx → (it_pat_idx, k, n_chunks)
|
||
for pi, pat_chunks in enumerate(chunk_map):
|
||
n = len(pat_chunks)
|
||
for k, ci in enumerate(pat_chunks):
|
||
chunk_info[ci] = (pi, k, n)
|
||
|
||
for ci, chunk_grid in enumerate(chunks):
|
||
pi, k, n = chunk_info.get(ci, (0, 0, 1))
|
||
|
||
for ch in range(num_channels):
|
||
if ch >= len(chunk_grid): continue
|
||
for row in chunk_grid[ch]:
|
||
if row.effect == EFF_B:
|
||
it_tgt = row.effect_arg
|
||
taud_cue = it_ord_to_taud_cue.get(it_tgt, it_tgt)
|
||
row.effect_arg = taud_cue & 0xFF
|
||
elif row.effect == EFF_C and k < n - 1:
|
||
# C in non-final chunk: need B to skip remaining chunks
|
||
# Find the cue index immediately after all chunks of this pat
|
||
# (the cue right after the last chunk of pi in the order list)
|
||
# We store the B in aux_effect; the Taud builder handles it.
|
||
skip_cue = _find_post_pat_cue(pi, order_list, chunk_map,
|
||
it_ord_to_taud_cue)
|
||
if skip_cue is not None:
|
||
row.aux_effect = (EFF_B, skip_cue & 0xFF)
|
||
|
||
def _find_post_pat_cue(pi: int, order_list: list, chunk_map: list,
|
||
it_ord_to_taud_cue: dict):
|
||
"""Return the Taud cue index that follows ALL chunks of pattern pi in the order list."""
|
||
for taud_cue, it_ord in it_ord_to_taud_cue.items():
|
||
# Find first Taud cue after the last chunk of pi
|
||
pass
|
||
# Simpler: walk the Taud cue list (we'll compute it in assemble_taud)
|
||
# Return None for now — assemble_taud will do a second pass.
|
||
return None
|
||
|
||
|
||
# ── Sample / instrument bin (same as s3m2taud) ────────────────────────────────
|
||
|
||
def build_sample_inst_bin_it(samples_or_proxy: list,
|
||
envelopes_by_slot: dict = None) -> tuple:
|
||
"""samples_or_proxy: list of ITSample | None, indexed 1-based (index 0 unused).
|
||
|
||
envelopes_by_slot: optional dict mapping taud_slot → (vol_env, vol_sus, pan_env, pan_sus, inst_gv)
|
||
where vol_env/pan_env are lists of 12 (value, minifloat_idx) tuples (or None),
|
||
and inst_gv is instrument global volume (0..255, byte 15).
|
||
|
||
Returns (bin_bytes[SAMPLEINST_SIZE], offsets_dict).
|
||
"""
|
||
pcm_list = [(i, s) for i, s in enumerate(samples_or_proxy)
|
||
if s is not None and s.sample_data]
|
||
|
||
total = sum(len(s.sample_data) for _, s in pcm_list)
|
||
ratio = 1.0
|
||
if total > SAMPLEBIN_SIZE:
|
||
ratio = SAMPLEBIN_SIZE / total
|
||
vprint(f" info: sample bin overflow ({total} bytes); resampling all by {ratio:.4f}")
|
||
for _, s in pcm_list:
|
||
new_data = resample_linear(s.sample_data, ratio)
|
||
s.sample_data = new_data
|
||
s.length = len(new_data)
|
||
s.loop_beg = max(0, int(s.loop_beg * ratio))
|
||
s.loop_end = max(0, min(int(s.loop_end * ratio), s.length))
|
||
s.c5_speed = max(1, int(s.c5_speed * ratio))
|
||
|
||
sample_bin = bytearray(SAMPLEBIN_SIZE)
|
||
offsets = {}
|
||
pos = 0
|
||
for idx, s in pcm_list:
|
||
n = min(len(s.sample_data), SAMPLEBIN_SIZE - pos)
|
||
if n <= 0:
|
||
vprint(f" warning: sample bin full, dropping '{s.name}'")
|
||
offsets[idx] = 0; s.length = 0; continue
|
||
sample_bin[pos:pos+n] = s.sample_data[:n]
|
||
offsets[idx] = pos
|
||
if n < len(s.sample_data):
|
||
vprint(f" warning: '{s.name}' truncated {len(s.sample_data)} → {n}")
|
||
s.length = n; s.loop_end = min(s.loop_end, n)
|
||
pos += n
|
||
|
||
inst_bin = bytearray(INSTBIN_SIZE)
|
||
for i, s in enumerate(samples_or_proxy):
|
||
taud_idx = i # samples_or_proxy is 0-based here; slot 0 unused
|
||
if i == 0 or i >= 256 or s is None:
|
||
continue
|
||
ptr = offsets.get(i, 0)
|
||
ptr_lo = ptr & 0xFFFF
|
||
ptr_hi = ptr >> 16
|
||
s_len = min(s.length, 65535)
|
||
c2spd = min(s.c5_speed, 65535)
|
||
ls = min(s.loop_beg, 65535)
|
||
le = min(s.loop_end, 65535)
|
||
if s.has_loop and (s.flags & IT_SMP_PINGPONG):
|
||
loop_mode = 2 # backandforth
|
||
elif s.has_loop:
|
||
loop_mode = 1 # forward loop
|
||
else:
|
||
loop_mode = 0 # no loop
|
||
flags_byte = (ptr_hi << 4) | (loop_mode & 0x3)
|
||
|
||
base = taud_idx * 64
|
||
struct.pack_into('<H', inst_bin, base + 0, ptr_lo)
|
||
struct.pack_into('<H', inst_bin, base + 2, s_len)
|
||
struct.pack_into('<H', inst_bin, base + 4, c2spd)
|
||
struct.pack_into('<H', inst_bin, base + 6, 0)
|
||
struct.pack_into('<H', inst_bin, base + 8, ls)
|
||
struct.pack_into('<H', inst_bin, base + 10, le)
|
||
inst_bin[base + 12] = flags_byte
|
||
|
||
# Write envelope data (12-point format: vol at +16..+39, pan at +40..+63)
|
||
env_data = envelopes_by_slot.get(taud_idx) if envelopes_by_slot else None
|
||
if env_data and env_data[0]:
|
||
vol_env, vol_sus, pan_env, pan_sus, inst_gv = env_data
|
||
inst_bin[base + 13] = vol_sus & 0xFF
|
||
inst_bin[base + 14] = pan_sus & 0xFF
|
||
inst_bin[base + 15] = inst_gv & 0xFF
|
||
for k, (val, mf) in enumerate(vol_env[:12]):
|
||
inst_bin[base + 16 + k*2] = val & 0xFF
|
||
inst_bin[base + 16 + k*2 + 1] = mf & 0xFF
|
||
if pan_env:
|
||
for k, (val, mf) in enumerate(pan_env[:12]):
|
||
inst_bin[base + 40 + k*2] = val & 0xFF
|
||
inst_bin[base + 40 + k*2 + 1] = mf & 0xFF
|
||
else:
|
||
for k in range(12):
|
||
inst_bin[base + 40 + k*2] = 0x80 # pan centre
|
||
inst_bin[base + 40 + k*2 + 1] = 0x00 # hold
|
||
else:
|
||
# No instrument envelope: single-point vol, neutral pan, full gv
|
||
inst_gv = env_data[4] if env_data else 255
|
||
inst_bin[base + 15] = inst_gv & 0xFF
|
||
inst_bin[base + 16] = min(s.vol, 63) # value 0-63
|
||
inst_bin[base + 17] = 0 # offset 0 = hold
|
||
for k in range(12):
|
||
inst_bin[base + 40 + k*2] = 0x80 # pan centre
|
||
inst_bin[base + 40 + k*2 + 1] = 0x00 # hold
|
||
vprint(f" instrument[{taud_idx}] '{s.name}' ptr:{ptr} c5spd:{s.c5_speed}")
|
||
|
||
return bytes(sample_bin) + bytes(inst_bin), offsets
|
||
|
||
|
||
# ── Pattern builder ───────────────────────────────────────────────────────────
|
||
|
||
def _it_default_pan(raw_pan: int) -> int:
|
||
"""Convert raw IT channel-pan byte to Taud 0..63."""
|
||
if raw_pan == 100: # surround → centre
|
||
return 31
|
||
p = raw_pan & 0x7F
|
||
return min(0x3F, round(p * 63 / 64))
|
||
|
||
def build_pattern_it(chunk_grid: list, ch_idx: int, default_pan: int,
|
||
inst_vols: dict) -> bytes:
|
||
"""Build a 512-byte Taud pattern for one IT channel from a 64-row chunk grid."""
|
||
out = bytearray(PATTERN_BYTES)
|
||
rows = chunk_grid[ch_idx] if ch_idx < len(chunk_grid) else [ITRow()] * PATTERN_ROWS
|
||
last_inst = 0
|
||
last_note_it = -1
|
||
last_vol = None
|
||
|
||
for r, cell in enumerate(rows[:PATTERN_ROWS]):
|
||
# ── Resolve vol-col into overrides ──────────────────────────────────
|
||
vs, vv, pan_from_vc, aux_eff = decode_volcol(cell.volcol)
|
||
|
||
# If vol-col provides an aux effect and cell has no main effect, use it
|
||
if aux_eff is not None and cell.effect == 0:
|
||
cell.effect, cell.effect_arg = aux_eff
|
||
aux_eff = None
|
||
elif aux_eff is not None:
|
||
vprint(f" ch{ch_idx} row{r}: dropped vol-col aux effect "
|
||
f"(main effect slot occupied)")
|
||
aux_eff = None
|
||
|
||
# If vol-col has a pan override
|
||
if pan_from_vc is not None:
|
||
cell.pan_set = pan_from_vc
|
||
|
||
# Encode main effect
|
||
op, arg16, vol_override, pan_override = encode_effect_it(
|
||
cell.effect, cell.effect_arg, ch_idx, r)
|
||
|
||
# ── Note ────────────────────────────────────────────────────────────
|
||
note_taud = NOTE_NOP
|
||
if cell.note >= 0:
|
||
note_taud = encode_note_it(cell.note)
|
||
|
||
if cell.inst > 0:
|
||
last_inst = cell.inst
|
||
|
||
note_triggers = (0 <= (cell.note if cell.note >= 0 else -1) <= 119)
|
||
|
||
# ── Volume column ────────────────────────────────────────────────────
|
||
# Priority: explicit cell vol (from vol-col 0-64) > note-trigger default
|
||
# > retrigger recall > vol-col slide > main-effect vol override > nop
|
||
if cell.volcol >= 0 and cell.volcol <= VC_VOL_HI:
|
||
vol_sel, vol_value = SEL_SET, min(cell.volcol, 0x3F)
|
||
elif note_triggers and last_inst > 0:
|
||
vol_sel = SEL_SET
|
||
vol_value = inst_vols.get(last_inst, 0x3F)
|
||
elif (cell.inst > 0 and cell.note < 0
|
||
and last_note_it >= 0 and last_vol is not None):
|
||
# Instrument-only retrigger: restate last volume
|
||
vol_sel, vol_value = SEL_SET, last_vol
|
||
elif vol_override is not None:
|
||
vol_sel, vol_value = vol_override
|
||
elif vs != SEL_FINE or vv != 0:
|
||
vol_sel, vol_value = vs, vv
|
||
else:
|
||
vol_sel, vol_value = SEL_FINE, 0
|
||
|
||
if cell.note is not None and 0 <= (cell.note if cell.note >= 0 else -1) <= 119:
|
||
last_note_it = cell.note
|
||
if vol_sel == SEL_SET:
|
||
last_vol = vol_value
|
||
|
||
# ── Pan column ───────────────────────────────────────────────────────
|
||
if cell.pan_set is not None:
|
||
pan_sel, pan_value = SEL_SET, cell.pan_set
|
||
elif pan_override is not None:
|
||
pan_sel, pan_value = pan_override
|
||
elif r == 0:
|
||
pan_sel, pan_value = SEL_SET, default_pan & 0x3F
|
||
else:
|
||
pan_sel, pan_value = SEL_FINE, 0
|
||
|
||
# Handle aux_effect (B) stored on a C cell for chunk-skip
|
||
if cell.aux_effect is not None:
|
||
aux_cmd, aux_arg = cell.aux_effect
|
||
if aux_cmd == EFF_B and op == TOP_C:
|
||
# Encode as B effect; C row break handled by engine's simultaneous B+C
|
||
op = TOP_C
|
||
# We need to emit both; store the B's target in arg16 high byte
|
||
# Taud simultaneous B+C: B sets order, C sets row. Engine handles.
|
||
# Encoding: keep op=TOP_C (pattern break), store B target in
|
||
# a separate "B command on another channel". We can't encode two
|
||
# effects in one cell, so instead just emit the B effect here
|
||
# and let the order index point past the remaining chunks.
|
||
# This is a best-effort; the engine should honour the lowest-channel B.
|
||
op = TOP_B
|
||
arg16 = aux_arg & 0xFF
|
||
|
||
vol_byte = (vol_value & 0x3F) | ((vol_sel & 0x3) << 6)
|
||
pan_byte = (pan_value & 0x3F) | ((pan_sel & 0x3) << 6)
|
||
|
||
taud_inst = last_inst & 0xFF if (note_triggers or cell.inst > 0) else 0
|
||
|
||
base = r * 8
|
||
struct.pack_into('<H', out, base + 0, note_taud)
|
||
out[base + 2] = taud_inst
|
||
out[base + 3] = vol_byte
|
||
out[base + 4] = pan_byte
|
||
out[base + 5] = op & 0xFF
|
||
struct.pack_into('<H', out, base + 6, arg16 & 0xFFFF)
|
||
|
||
return bytes(out)
|
||
|
||
|
||
# ── Main assembly ─────────────────────────────────────────────────────────────
|
||
|
||
def find_initial_bpm_speed(patterns_rows: list, order_list: list,
|
||
default_speed: int, default_tempo: int) -> tuple:
|
||
speed = default_speed or 6
|
||
tempo = default_tempo or 125
|
||
for order in order_list:
|
||
if order >= IT_ORD_END: break
|
||
if order >= len(patterns_rows): continue
|
||
grid, _rows = patterns_rows[order]
|
||
for ch_rows in grid:
|
||
if not ch_rows: continue
|
||
cell = ch_rows[0]
|
||
if cell.effect == EFF_A and cell.effect_arg > 0:
|
||
speed = cell.effect_arg
|
||
if cell.effect == EFF_T and cell.effect_arg > 0:
|
||
tempo = cell.effect_arg
|
||
break
|
||
return speed, tempo
|
||
|
||
def _active_channels(h: ITHeader, patterns_rows: list) -> list:
|
||
"""Return up to 20 non-muted, in-use channel indices."""
|
||
# Muted = bit 7 of chnl_pan set, or == 0xC0
|
||
muted = set()
|
||
for i, p in enumerate(h.chnl_pan):
|
||
if p & 0x80 or p == 0xC0:
|
||
muted.add(i)
|
||
|
||
# In-use = any non-empty cell appears on this channel
|
||
in_use = set()
|
||
for grid, rows in patterns_rows:
|
||
for ch in range(64):
|
||
if ch >= len(grid): continue
|
||
for cell in grid[ch]:
|
||
if cell.note >= 0 or cell.inst > 0 or cell.effect != 0:
|
||
in_use.add(ch)
|
||
break
|
||
|
||
active = [i for i in range(64) if i in in_use and i not in muted]
|
||
if len(active) > NUM_VOICES:
|
||
vprint(f" warning: {len(active)} active channels; capping at {NUM_VOICES}")
|
||
active = active[:NUM_VOICES]
|
||
return active
|
||
|
||
def assemble_taud(h: ITHeader, samples: list, instruments: list,
|
||
patterns_rows: list, decompress: bool,
|
||
no_pf_envelope: bool = False) -> bytes:
|
||
# ── Resolve IT recalls ───────────────────────────────────────────────────
|
||
vprint(" resolving IT recalls…")
|
||
resolve_it_recalls(patterns_rows, h.order_list, 64, h.link_gef,
|
||
old_effects=h.old_effects,
|
||
initial_global_vol=h.global_vol)
|
||
|
||
# ── Check SBx chunk crossing (warn only) ─────────────────────────────────
|
||
for pi, (grid, rows) in enumerate(patterns_rows):
|
||
if rows <= PATTERN_ROWS: continue
|
||
n_chunks = (rows + PATTERN_ROWS - 1) // PATTERN_ROWS
|
||
for ch in range(64):
|
||
if ch >= len(grid): continue
|
||
loop_start_chunk = None
|
||
for r, cell in enumerate(grid[ch]):
|
||
if cell.effect == EFF_S:
|
||
sub = (cell.effect_arg >> 4) & 0xF
|
||
val = cell.effect_arg & 0xF
|
||
k = r // PATTERN_ROWS
|
||
if sub == 0xB and val == 0:
|
||
loop_start_chunk = k
|
||
elif sub == 0xB and val > 0:
|
||
if loop_start_chunk is not None and k != loop_start_chunk:
|
||
vprint(f" warning: pattern {pi} ch{ch}: SBx crosses "
|
||
f"chunk boundary (loops may misbehave)")
|
||
break
|
||
|
||
# ── Split patterns into 64-row chunks ────────────────────────────────────
|
||
vprint(" splitting patterns…")
|
||
chunks, chunk_map = split_patterns(patterns_rows)
|
||
|
||
# ── Choose active channels ───────────────────────────────────────────────
|
||
active_channels = _active_channels(h, patterns_rows)
|
||
C = len(active_channels)
|
||
if C == 0:
|
||
sys.exit("error: no active channels found")
|
||
|
||
# ── Build the ordered list of (taud_chunk_idx, voice_idx) triples ────────
|
||
# Expand order list: each IT order → sequence of chunk indices for that pattern
|
||
taud_cue_list = [] # list of chunk_idx (source patterns, already chunked)
|
||
it_ord_to_taud_cue = {} # first taud cue for IT order i
|
||
|
||
for oi, order in enumerate(h.order_list):
|
||
if order == IT_ORD_END:
|
||
break
|
||
if order == IT_ORD_SKIP:
|
||
continue
|
||
if order >= len(chunk_map):
|
||
continue
|
||
it_ord_to_taud_cue.setdefault(oi, len(taud_cue_list))
|
||
for ci in chunk_map[order]:
|
||
taud_cue_list.append(ci)
|
||
|
||
# ── Remap B effects ──────────────────────────────────────────────────────
|
||
_remap_bc_effects(chunks, chunk_map, h.order_list, it_ord_to_taud_cue,
|
||
len(active_channels))
|
||
|
||
# ── Build sample proxy list (0-indexed, slot 0 unused) ──────────────────
|
||
# When use_instruments: map Taud instrument slots to samples via canonical_sample.
|
||
# Pattern cells carry IT instrument numbers; for use_instruments mode, those
|
||
# are instrument indices; we remap to samples below.
|
||
# Taud only knows "instrument" slots (1-based, 8-bit). We lay samples in order.
|
||
if h.use_instruments:
|
||
# Build a proxy sample list where Taud inst slot = IT inst index,
|
||
# resolved to the canonical sample. Slot 0 unused.
|
||
ticks_per_sec = max(h.initial_tempo * 2.0 / 5.0, 1.0)
|
||
proxy = [None] * (max(len(instruments), 256) + 1)
|
||
inst_vols = {}
|
||
envelopes_by_slot = {}
|
||
bake_count = 0
|
||
for ii, inst in enumerate(instruments):
|
||
taud_slot = ii + 1
|
||
if taud_slot >= 256: break
|
||
if inst is None: continue
|
||
si = inst.canonical_sample - 1 # 0-based sample index
|
||
if si < 0 or si >= len(samples) or samples[si] is None:
|
||
continue
|
||
src_smp = samples[si]
|
||
if (not no_pf_envelope and inst.pf_nodes
|
||
and inst.pf_flags and inst.pf_flags.get('enabled')):
|
||
baked = _bake_pf_envelope(src_smp, inst, ticks_per_sec)
|
||
if baked is not src_smp:
|
||
bake_count += 1
|
||
mode = 'filter' if inst.pf_flags['is_filter'] else 'pitch'
|
||
vprint(f" baked pf envelope on inst[{ii+1}] '{inst.name}' "
|
||
f"(mode={mode}, src_len={src_smp.length}, "
|
||
f"out_len={baked.length})")
|
||
src_smp = baked
|
||
proxy[taud_slot] = src_smp
|
||
vol64 = min(inst.canonical_volume, 64)
|
||
inst_vols[taud_slot] = min(vol64, 0x3F)
|
||
# IT global volume range is 0..128; rescale to Taud's 0..255.
|
||
inst_gv_255 = min(255, round(inst.gv * 255 / 128))
|
||
envelopes_by_slot[taud_slot] = (
|
||
inst.vol_envelope, inst.vol_env_sustain,
|
||
inst.pan_envelope, inst.pan_env_sustain,
|
||
inst_gv_255,
|
||
)
|
||
if bake_count:
|
||
vprint(f" pf envelope baking: {bake_count} instrument(s)")
|
||
sampleinst_raw, _ = build_sample_inst_bin_it(proxy, envelopes_by_slot)
|
||
else:
|
||
# Samples referenced directly; proxy is samples list (0-based, slot 0 unused)
|
||
proxy = [None] + list(samples)
|
||
inst_vols = {
|
||
i+1: min(s.vol, 0x3F)
|
||
for i, s in enumerate(samples)
|
||
if s is not None
|
||
}
|
||
sampleinst_raw, _ = build_sample_inst_bin_it(proxy)
|
||
|
||
assert len(sampleinst_raw) == SAMPLEINST_SIZE
|
||
|
||
compressed = gzip.compress(sampleinst_raw, compresslevel=9, mtime=0)
|
||
comp_size = len(compressed)
|
||
vprint(f" sample+inst bin: {SAMPLEINST_SIZE} → {comp_size} bytes (gzip)")
|
||
|
||
# ── BPM / speed ──────────────────────────────────────────────────────────
|
||
speed, tempo = find_initial_bpm_speed(patterns_rows, h.order_list,
|
||
h.initial_speed, h.initial_tempo)
|
||
tempo = max(24, min(280, tempo))
|
||
bpm_stored = (tempo - 24) & 0xFF
|
||
vprint(f" initial speed={speed}, tempo={tempo} BPM")
|
||
|
||
# ── Pattern bin ──────────────────────────────────────────────────────────
|
||
vprint(" building pattern bin…")
|
||
default_pans = [_it_default_pan(h.chnl_pan[ch]) for ch in active_channels]
|
||
total_taud_pats = len(taud_cue_list) * C
|
||
if total_taud_pats > NUM_PATTERNS_MAX:
|
||
sys.exit(
|
||
f"error: {len(taud_cue_list)} cues × {C} channels = "
|
||
f"{total_taud_pats} > {NUM_PATTERNS_MAX} Taud pattern limit."
|
||
)
|
||
|
||
pat_bin = bytearray()
|
||
for ci in taud_cue_list:
|
||
cg = chunks[ci]
|
||
for vi, ch in enumerate(active_channels):
|
||
pat_bin += build_pattern_it(cg, ch, default_pans[vi], inst_vols)
|
||
|
||
orig_count = len(taud_cue_list) * C
|
||
pat_bin, pat_remap, num_taud_pats = deduplicate_patterns(bytes(pat_bin), orig_count)
|
||
vprint(f" patterns: {orig_count} → {num_taud_pats} unique "
|
||
f"({orig_count - num_taud_pats} deduplicated)")
|
||
|
||
# ── Cue sheet ────────────────────────────────────────────────────────────
|
||
vprint(" building cue sheet…")
|
||
song_offset = TAUD_HEADER_SIZE + comp_size + TAUD_SONG_ENTRY
|
||
sheet = bytearray(NUM_CUES * CUE_SIZE)
|
||
for c in range(NUM_CUES):
|
||
sheet[c*CUE_SIZE:c*CUE_SIZE+CUE_SIZE] = encode_cue([], 0)
|
||
|
||
last_active = -1
|
||
for cue_idx, ci in enumerate(taud_cue_list):
|
||
if cue_idx >= NUM_CUES: break
|
||
base_pat = cue_idx * C
|
||
pats = [pat_remap[base_pat + vi] for vi in range(C)]
|
||
sheet[cue_idx*CUE_SIZE:(cue_idx+1)*CUE_SIZE] = encode_cue(pats, 0)
|
||
last_active = cue_idx
|
||
|
||
if last_active >= 0:
|
||
sheet[last_active * CUE_SIZE + 30] = 0x01
|
||
else:
|
||
sheet[30] = 0x01
|
||
|
||
# ── Header ───────────────────────────────────────────────────────────────
|
||
sig = (SIGNATURE + b' ' * 14)[:14]
|
||
header = (
|
||
TAUD_MAGIC +
|
||
bytes([TAUD_VERSION, 1]) +
|
||
struct.pack('<I', comp_size) +
|
||
b'\x00\x00\x00\x00' +
|
||
sig
|
||
)
|
||
assert len(header) == TAUD_HEADER_SIZE
|
||
|
||
# flags byte: bit 1 (f) = Amiga pitch-slide mode (IT linear_slides flag inverted)
|
||
flags_byte = 0x00 if h.linear_slides else 0x02
|
||
song_table = struct.pack('<IBHBBHfB',
|
||
song_offset, C, num_taud_pats,
|
||
bpm_stored, speed,
|
||
0xA000, # C9
|
||
8363.0,
|
||
flags_byte,
|
||
)
|
||
assert len(song_table) == TAUD_SONG_ENTRY
|
||
|
||
return header + compressed + song_table + bytes(pat_bin) + bytes(sheet)
|
||
|
||
|
||
# ── Main ──────────────────────────────────────────────────────────────────────
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser(description=__doc__,
|
||
formatter_class=argparse.RawDescriptionHelpFormatter)
|
||
ap.add_argument('input', help='Input .it file')
|
||
ap.add_argument('output', help='Output .taud file')
|
||
ap.add_argument('-v', '--verbose', action='store_true')
|
||
ap.add_argument('--no-decompress', action='store_true',
|
||
help='Treat compressed IT samples as silent (debug)')
|
||
ap.add_argument('--no-pf-envelope', action='store_true',
|
||
help='Skip baking IT pitch/filter envelope onto sample copies')
|
||
args = ap.parse_args()
|
||
set_verbose(args.verbose)
|
||
|
||
with open(args.input, 'rb') as f:
|
||
data = f.read()
|
||
|
||
vprint(f"parsing '{args.input}' ({len(data)} bytes)…")
|
||
h = parse_it_header(data)
|
||
vprint(f" title: '{h.title}'")
|
||
vprint(f" orders={h.ord_count} insts={h.ins_count} "
|
||
f"samples={h.smp_count} patterns={h.pat_count}")
|
||
vprint(f" flags: linear={h.linear_slides} use_inst={h.use_instruments} "
|
||
f"link_gef={h.link_gef}")
|
||
|
||
samples = parse_samples(data, h, decompress=not args.no_decompress)
|
||
instruments = parse_instruments(data, h) if h.use_instruments else []
|
||
patterns_rows = parse_patterns(data, h)
|
||
|
||
taud = assemble_taud(h, samples, instruments, patterns_rows,
|
||
decompress=not args.no_decompress,
|
||
no_pf_envelope=args.no_pf_envelope)
|
||
|
||
with open(args.output, 'wb') as f:
|
||
f.write(taud)
|
||
|
||
print(f"wrote {len(taud)} bytes to '{args.output}'")
|
||
if args.verbose:
|
||
print(f" magic ok: {taud[:8].hex()}", file=sys.stderr)
|
||
sig_off = TAUD_HEADER_SIZE - 14
|
||
print(f" signature: {taud[sig_off:sig_off+14]}", file=sys.stderr)
|
||
|
||
if __name__ == '__main__':
|
||
main()
|