mirror of
https://github.com/LingandRX/script_library.git
synced 2025-10-28 08:41:16 +08:00
添加ncm转mp3,flac
This commit is contained in:
parent
3855127f24
commit
e7b26f7e04
1
ncmTranslatorScript_python/README.md
Normal file
1
ncmTranslatorScript_python/README.md
Normal file
@ -0,0 +1 @@
|
||||
|
||||
149
ncmTranslatorScript_python/ncmTranslator.py
Normal file
149
ncmTranslatorScript_python/ncmTranslator.py
Normal file
@ -0,0 +1,149 @@
|
||||
import base64
|
||||
import binascii
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
from os import fspath
|
||||
from pathlib import Path
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from Crypto.Cipher import AES
|
||||
from tqdm import tqdm
|
||||
|
||||
music_suffix_list = ['mp3', 'wav', 'ape', 'flac', 'MP3', 'WAV', 'APE', 'FLAC']
|
||||
|
||||
|
||||
# 进行ncm解码
|
||||
def dump(file_path, file_name_no_suffix):
|
||||
core_key = binascii.a2b_hex("687A4852416D736F356B496E62617857")
|
||||
meta_key = binascii.a2b_hex("2331346C6A6B5F215C5D2630553C2728")
|
||||
unpad = lambda s: s[0:-(s[-1] if isinstance(s[-1], int) else ord(s[-1]))]
|
||||
with open(file_path, 'rb') as f:
|
||||
header = f.read(8)
|
||||
assert binascii.b2a_hex(header) == b'4354454e4644414d'
|
||||
f.seek(2, 1)
|
||||
key_length = struct.unpack('<I', f.read(4))[0]
|
||||
key_data = f.read(key_length)
|
||||
key_data = bytes([b ^ 0x64 for b in key_data])
|
||||
cryptor = AES.new(core_key, AES.MODE_ECB)
|
||||
key_data = unpad(cryptor.decrypt(key_data))[17:]
|
||||
key_length = len(key_data)
|
||||
key_box = bytearray(range(256))
|
||||
c = 0
|
||||
last_byte = 0
|
||||
key_offset = 0
|
||||
for i in range(256):
|
||||
swap = key_box[i]
|
||||
c = (swap + last_byte + key_data[key_offset]) & 0xff
|
||||
key_offset = (key_offset + 1) % key_length
|
||||
key_box[i], key_box[c] = key_box[c], swap
|
||||
last_byte = c
|
||||
|
||||
meta_length = struct.unpack('<I', f.read(4))[0]
|
||||
meta_data = f.read(meta_length)
|
||||
meta_data = bytes([b ^ 0x63 for b in meta_data])
|
||||
meta_data = base64.b64decode(meta_data[22:])
|
||||
cryptor = AES.new(meta_key, AES.MODE_ECB)
|
||||
meta_data = unpad(cryptor.decrypt(meta_data)).decode('utf-8')[6:]
|
||||
meta_data = json.loads(meta_data)
|
||||
|
||||
f.seek(9, 1) # 跳过crc32(4字节)和未知5字节
|
||||
image_size = struct.unpack('<I', f.read(4))[0]
|
||||
_ = f.read(image_size) # 专辑图像数据忽略
|
||||
|
||||
file_name = file_name_no_suffix + '.' + meta_data['format']
|
||||
output_path = os.path.join(os.path.split(file_path)[0], file_name)
|
||||
|
||||
with open(output_path, 'wb') as m:
|
||||
while True:
|
||||
chunk = bytearray(f.read(0x8000))
|
||||
if not chunk:
|
||||
break
|
||||
for i in range(len(chunk)):
|
||||
j = (i + 1) & 0xff
|
||||
chunk[i] ^= key_box[(key_box[j] + key_box[(key_box[j] + j) & 0xff]) & 0xff]
|
||||
m.write(chunk)
|
||||
|
||||
# ✅ 专辑图片下载已注释
|
||||
# try:
|
||||
# urllib.request.urlretrieve(meta_data['albumPic'],
|
||||
# os.path.join(os.path.split(file_path)[0], file_name_no_suffix) + '.jpg')
|
||||
# except Exception as e:
|
||||
# print('下载专辑图片出错', e)
|
||||
|
||||
return output_path # 返回生成的文件路径
|
||||
|
||||
|
||||
def file_extension(path):
|
||||
return os.path.splitext(path)[-1][1:]
|
||||
|
||||
|
||||
def file_no_extension(path):
|
||||
filename = os.path.basename(path)
|
||||
if not filename or filename.startswith('.'):
|
||||
return filename
|
||||
return os.path.splitext(filename)[0]
|
||||
|
||||
|
||||
def file_exist(file_name, file_list, file_list_path):
|
||||
base_name = file_no_extension(file_name)
|
||||
for file in file_list:
|
||||
if os.path.isdir(os.path.join(file_list_path, file)):
|
||||
continue
|
||||
for suffix in music_suffix_list:
|
||||
if (base_name + "." + suffix) == file:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def recursion(file_name, root_dir, file_list, tasks):
|
||||
full_file = os.path.join(root_dir, file_name)
|
||||
if os.path.isfile(full_file):
|
||||
if file_extension(full_file) != "ncm":
|
||||
return
|
||||
if file_exist(file_name, file_list, root_dir):
|
||||
return
|
||||
tasks.append((full_file, file_no_extension(file_name)))
|
||||
elif os.path.isdir(full_file):
|
||||
for child in os.listdir(full_file):
|
||||
recursion(child, full_file, os.listdir(full_file), tasks)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if len(sys.argv) > 1:
|
||||
rootdir = sys.argv[1]
|
||||
else:
|
||||
rootdir = fspath(Path(__file__).parent.resolve())
|
||||
print('当前需要处理的文件夹路径: ' + rootdir)
|
||||
|
||||
start_time = time.time()
|
||||
print('开始处理 ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
|
||||
|
||||
file_list = os.listdir(rootdir)
|
||||
tasks = []
|
||||
for file in file_list:
|
||||
recursion(file, rootdir, file_list, tasks)
|
||||
|
||||
total = len(tasks)
|
||||
print(f"共找到 {total} 个待转码文件")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
|
||||
futures = {executor.submit(dump, file_path, no_suffix): file_path for file_path, no_suffix in tasks}
|
||||
|
||||
with tqdm(total=total, desc="正在转码", unit="file") as pbar:
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
result_path = future.result()
|
||||
# ✅ 在进度条上方显示文件名称
|
||||
tqdm.write(f"转码完成: {result_path}")
|
||||
except Exception as e:
|
||||
logging.exception("转码任务出错", exc_info=e)
|
||||
finally:
|
||||
pbar.update(1)
|
||||
|
||||
end_time = time.time()
|
||||
print('全部文件处理完成 ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
|
||||
print('处理时间:', end_time - start_time)
|
||||
2
ncmTranslatorScript_python/requirements.txt
Normal file
2
ncmTranslatorScript_python/requirements.txt
Normal file
@ -0,0 +1,2 @@
|
||||
pycryptodome>=3.20.0
|
||||
tqdm>=4.66.0
|
||||
Loading…
Reference in New Issue
Block a user