-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathresolvepatch.py
More file actions
1065 lines (907 loc) · 40.2 KB
/
resolvepatch.py
File metadata and controls
1065 lines (907 loc) · 40.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""resolvepatch — patch DaVinci Resolve.exe and Fusion Studio's fusionsystem.dll
to bypass license checks.
Verified on:
- Resolve 20.3.2.9 and 21.0.0.28
- Fusion Studio 20.3.2 and 21.0.0.25
on Windows.
Usage (run as Administrator):
python resolvepatch.py # patch every detected target
python resolvepatch.py --restore # restore everything from .bak
python resolvepatch.py --targets resolve # only Resolve.exe
python resolvepatch.py --targets fusion # only Fusion's fusionsystem.dll(s)
python resolvepatch.py --path <p> # explicit Resolve.exe path
"""
from __future__ import annotations
import argparse
import ctypes
import logging
import msvcrt
import os
import re
import shutil
import struct
import subprocess
import sys
import time
import winreg
from pathlib import Path
from typing import Callable, Optional, Sequence, Union
logger = logging.getLogger("resolvepatch")
# --------------------------------------------------------------------- constants
DEFAULT_PATH = r"C:\Program Files\Blackmagic Design\DaVinci Resolve\Resolve.exe"
# Standard install paths for Fusion Studio. The same pattern-based patch covers
# both 20.x and 21.x — only the file offset within each DLL differs, and the
# pattern matcher resolves that automatically.
DEFAULT_FUSION_PATHS = (
r"C:\Program Files\Blackmagic Design\Fusion 21\fusionsystem.dll",
r"C:\Program Files\Blackmagic Design\Fusion 20\fusionsystem.dll",
)
# File extensions whose ShellOpen command points at Resolve.exe; used by the
# auto-locator when the standard install path doesn't exist.
SHELLOPEN_EXTENSION_KEYS = (
"ResolveBinFile",
"ResolveDrpFile",
"ResolveDBKeyFile",
"ResolveTimelineFile",
"ResolveTemplateBundle",
)
# Fake RLM license file written next to Resolve.exe; HKLM env var
# RLM_LICENSE=blackmagic.lic points the RLM library at this name.
LICENSE_FILE_CONTENTS = (
'LICENSE blackmagic davinciresolvestudio 999999 permanent uncounted\n'
' hostid=ANY issuer=ANY customer=ANY issued=14-Aug-2025\n'
' akey=0000-0000-0000-0000-0000 _ck=00 sig="00"'
)
# Pattern entry: int (0..255) for an exact byte, None for a single-byte wildcard.
PatternByte = Optional[int]
Pattern = Sequence[PatternByte]
# Replacement: either fixed bytes, or a callable computing bytes from context.
ReplacementFn = Callable[[bytearray, int, Pattern], bytes]
Replacement = Union[bytes, ReplacementFn]
class PatchError(Exception):
"""Anything that prevents the patch from proceeding cleanly."""
# --------------------------------------------------------------------- pattern matching
_compiled_pattern_cache: dict = {}
def _compile_pattern(pattern: Pattern) -> "re.Pattern[bytes]":
key = tuple(pattern)
rx = _compiled_pattern_cache.get(key)
if rx is None:
parts = [b'.' if b is None else re.escape(bytes([b])) for b in pattern]
rx = re.compile(b''.join(parts), re.DOTALL)
_compiled_pattern_cache[key] = rx
return rx
def find_all(data: bytes, pattern: Pattern, start: int = 0,
end: Optional[int] = None) -> list:
"""Return absolute offsets where `pattern` matches inside `data[start:end]`."""
if end is None:
end = len(data)
return [m.start() for m in _compile_pattern(pattern).finditer(data, start, end)]
# --------------------------------------------------------------------- replacement callables
def _je_to_jmp_preserving_target(data, addr, _sig):
"""`0F 84 b0 b1 b2 b3` (je rel32) -> `90 E9 b0 b1 b2 b3` (nop; jmp rel32).
The 32-bit displacement is read from the binary at apply-time, so the
rewritten jmp lands at exactly the je's original target regardless of
where in the function the je sits. Version-independent."""
return bytes([0x90, 0xE9]) + bytes(data[addr + 2:addr + 6])
def _force_first_jne_to_jmp(data, addr, _sig):
"""v21 chooser bypass — see PATCHES_21 entry.
Pattern is 28 bytes; bytes [22..27] are `0F 85 disp32` (jne early_exit).
Convert to `90 E9 disp32` (nop+jmp, same target via preserved displacement).
The license-check function then unconditionally takes the early-exit
success branch and never reaches dialog construction."""
return (bytes(data[addr:addr + 22])
+ bytes([0x90, 0xE9])
+ bytes(data[addr + 24:addr + 28]))
def _force_first_je_to_jmp_v20(data, addr, _sig):
"""v20.3.x chooser bypass — see PATCHES_20 entry.
Pattern is 41 bytes; bytes [35..40] are `0F 84 disp32` (je early_exit).
Same trick as the v21 callable: convert the conditional jump to an
unconditional one by overwriting `0F 84` with `90 E9` (nop+jmp) and
leaving the 32-bit displacement bytes intact, so the jmp lands exactly
where the je would have. v20.3 uses `je` instead of `jne` because the
underlying check returns 0 on success rather than non-zero."""
return (bytes(data[addr:addr + 35])
+ bytes([0x90, 0xE9])
+ bytes(data[addr + 37:addr + 41]))
def _fusion_dolicensing_je_to_jmp_epilogue(data, addr, _sig):
"""Fusion Studio activation-dialog bypass — see PATCHES_FUSION entry.
Pattern is 21 bytes; the last 2 are `74 11` (je rel8 +0x11), routing to
a THREAD_SPAWN block inside Fusion::FusionApp::DoLicensing(). Replace
those 2 bytes with `EB 71` (jmp rel8 +0x71) so control falls through to
the function's `mov al, 1; ret` epilogue instead. The first 19 bytes
(which include the version-specific [g_IsResolve] disp32) are preserved
verbatim. The `+0x71` displacement has been stable across observed
versions because the THREAD_SPAWN block has identical MSVC codegen."""
return bytes(data[addr:addr + 19]) + bytes([0xEB, 0x71])
# --------------------------------------------------------------------- patch tables
#
# Each patch is `(pattern, replacement)`.
# Resolve 21.x. Index meaning:
# 0: license-check chain function at VA 0x140D87010 (v21.0.0.28). Convert
# the first `jne 0x140D8718C` to unconditional `jmp` so the function
# always takes the early-exit success path. Without this patch v21
# shows the License Key / Blackmagic Cloud ID chooser at startup;
# the call chain (license-check fn -> 0x14497D6D0 with op-id 9 ->
# UiActivationDialogImp ctor) was confirmed via Frida.
PATCHES_21: list[tuple[Pattern, Replacement]] = [
(
[0x48, 0x89, 0x5C, 0x24, 0x10, 0x57,
0x48, 0x81, 0xEC, 0x80, 0x00, 0x00, 0x00,
0x33, 0xDB,
0xE8, None, None, None, None,
0x84, 0xC0,
0x0F, 0x85, None, None, None, None],
_force_first_jne_to_jmp,
),
]
# Resolve 20.x. Index meaning:
# 0: license-check chain function at VA 0x143B0EC60 (v20.3.2.9). Same
# dialog-construction call chain as v21 (UiActivationDialogImp ctor
# is invoked from a license-op dispatcher with op-id 9 when no
# license check returns the "OK" sentinel value). Convert the
# first `je 0x143B0EDE9` (note: je on v20, not jne — the underlying
# license check returns 0 on success here) to unconditional `jmp`,
# forcing the early-exit success path. Confirmed via Frida.
PATCHES_20: list[tuple[Pattern, Replacement]] = [
(
[0x48, 0x89, 0x5C, 0x24, 0x10, 0x57,
0x48, 0x81, 0xEC, 0x80, 0x00, 0x00, 0x00,
0x33, 0xDB,
0xE8, None, None, None, None,
0x45, 0x33, 0xC0,
0x33, 0xD2,
0x48, 0x8B, 0xC8,
0xE8, None, None, None, None,
0x85, 0xC0,
0x0F, 0x84, None, None, None, None],
_force_first_je_to_jmp_v20,
),
]
# Fusion Studio activation-dialog bypass (covers v20.3.x and v21.0.x). Inside
# Fusion::FusionApp::DoLicensing() the relevant code is:
# mov rcx, rbx ; 48 8B CB
# call qword [rax+0x220] ; FF 90 20 02 00 00 (Log "Checking for licenses...")
# mov rax, [g_IsResolve] ; 48 8B 05 disp32 (disp32 is version-specific)
# cmp byte [rax], 0 ; 80 38 00
# je THREAD_SPAWN ; 74 11
# The je conditionally enters a block that allocates Events, registers a
# callback in [g_handler], spawns a worker thread, and ResumeThread()s it.
# That worker thread is what eventually requests the activation-key dialog.
# Flipping `74 11` -> `EB 71` retargets the jmp to the function's
# `mov al, 1; ret` epilogue, skipping the THREAD_SPAWN block entirely. As a
# bonus, InitInstance's wait-for-license-message loop is gated by
# `cmp [g_handler], <addr>` and skips itself when the handler is unset.
#
# Pattern length is 21 bytes (4 wildcards). An earlier 12-byte version
# (mov rax,[mem]; cmp byte [rax],0; je +0x11) had two coincidental matches
# elsewhere in the binary — likely other "is feature flag X enabled?"
# checks — so the patcher's "matched > 1 time" guard would refuse to apply.
# Anchoring on the preceding `48 8B CB FF 90 20 02 00 00` (mov rcx, rbx +
# call vtable[0x220]) makes the match unique on both v20 and v21.
PATCHES_FUSION: "list[tuple[Pattern, Replacement]]" = [
(
[0x48, 0x8B, 0xCB,
0xFF, 0x90, 0x20, 0x02, 0x00, 0x00,
0x48, 0x8B, 0x05, None, None, None, None,
0x80, 0x38, 0x00,
0x74, 0x11],
_fusion_dolicensing_je_to_jmp_epilogue,
),
]
# --------------------------------------------------------------------- PE version
def determine_version(data: bytes) -> tuple[int, int, int]:
"""Read VS_FIXEDFILEINFO directly via its 0xFEEF04BD signature — unique
enough that we don't need to walk the full PE header."""
idx = data.find(b'\xBD\x04\xEF\xFE')
if idx < 0:
raise PatchError("Failed to parse PE header for main executable.")
file_version_ms = struct.unpack_from('<I', data, idx + 8)[0]
file_version_ls = struct.unpack_from('<I', data, idx + 12)[0]
return (
(file_version_ms >> 16) & 0xFFFF,
file_version_ms & 0xFFFF,
(file_version_ls >> 16) & 0xFFFF,
)
# --------------------------------------------------------------------- inner-function patch
def patch_4func(data: bytearray) -> None:
"""Locate a specific call (matched by an outer pattern), follow its rel32
target, and flip a few `je` -> `jne` inside the destination function
(the Dolby Vision license validator). Indices `[0]` and `[0, 1, 2]` were
chosen against v20.x and remain stable on v21.0.x.
Raises PatchError without mutating `data` if any pattern is missing —
we resolve every write site up front and only apply once everything is
found, so partial mutation can't escape this function."""
outer_pattern = [0xE8, None, None, None, None, 0x88, 0x83, None, None, None, None,
0x48, 0x8D, 0x4C, 0x24, None, 0xFF, 0x15]
occs = find_all(data, outer_pattern)
if len(occs) != 1:
raise PatchError(
f"patch_4func: outer pattern matched {len(occs)} times instead of 1."
)
call_addr = occs[0]
rel32 = struct.unpack_from('<I', data, call_addr + 1)[0]
fn_start = call_addr + 5 + rel32
inner_patches = [
([0x84, 0xC0, 0x0F, 0x84], bytes([0x84, 0xC0, 0x0F, 0x85]), [0]),
([0x85, 0xDB, 0x0F, 0x84], bytes([0x85, 0xDB, 0x0F, 0x85]), [0, 1, 2]),
]
# Resolve every write site up front so a missing inner pattern aborts
# before any byte is touched.
writes: list = []
for sub_pat, repl, idxs in inner_patches:
occs = find_all(data, sub_pat, fn_start, fn_start + 0x1000)
for i in idxs:
if i >= len(occs):
raise PatchError(
f"patch_4func: inner pattern {bytes(b for b in sub_pat).hex()} "
f"index {i} not found (only {len(occs)} hits in window)."
)
writes.append((occs[i], repl))
for off, repl in writes:
data[off:off + len(repl)] = repl
# --------------------------------------------------------------------- license file & env var
def configure_license_file(resolve_path: str) -> None:
"""Write the fake RLM license next to Resolve.exe and set HKLM
RLM_LICENSE=blackmagic.lic. Broadcasts WM_SETTINGCHANGE so newly-spawned
processes pick up the env var without a logout/reboot."""
lic_path = Path(resolve_path).with_name("blackmagic.lic")
try:
lic_path.write_text(LICENSE_FILE_CONTENTS)
logger.info("wrote license file: %s", lic_path)
except OSError as e:
raise PatchError(f"failed to write license file: {e}") from e
try:
with winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
r"System\CurrentControlSet\Control\Session Manager\Environment",
0, winreg.KEY_SET_VALUE,
) as key:
winreg.SetValueEx(key, "RLM_LICENSE", 0, winreg.REG_SZ, "blackmagic.lic")
logger.info("set HKLM RLM_LICENSE=blackmagic.lic")
except OSError as e:
raise PatchError(
f"failed to set HKLM RLM_LICENSE (set it manually if needed): {e}"
) from e
HWND_BROADCAST = 0xFFFF
WM_SETTINGCHANGE = 0x001A
SMTO_ABORTIFHUNG = 0x0002
try:
result = ctypes.c_long()
ctypes.windll.user32.SendMessageTimeoutW(
HWND_BROADCAST, WM_SETTINGCHANGE, 0,
"Environment", SMTO_ABORTIFHUNG, 5000, ctypes.byref(result),
)
logger.info("broadcast WM_SETTINGCHANGE so new shells/Explorer see RLM_LICENSE")
except OSError as e:
logger.warning("failed to broadcast WM_SETTINGCHANGE: %s", e)
# --------------------------------------------------------------------- atomic write
def _atomic_write_with_retry(target: str, payload: bytes, action: str) -> None:
"""Write `payload` to `target` via `target + .new` + os.replace, retrying
up to 5x on transient locks (AV scanner, Explorer preview thumbnailer, etc.).
`action` is a short label used in error messages ("write" / "restore")."""
tmp_path = target + ".new"
last_err: Optional[OSError] = None
for attempt in range(5):
try:
with open(tmp_path, "wb") as f:
f.write(payload)
os.replace(tmp_path, target)
return
except OSError as e:
last_err = e
if os.path.exists(tmp_path):
try:
os.remove(tmp_path)
except OSError:
pass
logger.warning("%s attempt %d/5 failed: %s — retrying in 2s",
action, attempt + 1, e)
time.sleep(2)
raise PatchError(
f"Unable to {action} Resolve.exe: {last_err}. "
"Make sure Resolve isn't running, close any Explorer window showing the folder, "
"and run this script as Administrator."
)
# --------------------------------------------------------------------- main patch / restore
def _select_patches(version: tuple[int, int, int]) -> list[tuple[Pattern, Replacement]]:
"""Pick the patch table for the detected version. Only v20.x and v21.x
are supported."""
major, minor, micro = version
if major == 20:
return PATCHES_20
if major == 21:
return PATCHES_21
raise PatchError(
f"Resolve {major}.{minor}.{micro} is unsupported. "
"This patcher only works on v20.x and v21.x."
)
def patch(resolve_path: str) -> None:
"""Patch Resolve.exe in place. Backs up to <path>.bak first if and only
if any patch actually modified the binary."""
try:
with open(resolve_path, "rb") as f:
data = bytearray(f.read())
except OSError:
raise PatchError("Resolve could not be located.")
version = determine_version(data)
logger.info("detected Resolve version %d.%d.%d", *version)
patches = _select_patches(version)
modified = False
for i, (sig, replacement) in enumerate(patches):
occs = find_all(data, sig)
if not occs:
logger.info("patch[%d]: no match (already patched or layout differs)", i)
continue
if len(occs) > 1:
logger.warning("patch[%d]: matched %d times — skipping", i, len(occs))
continue
addr = occs[0]
repl_bytes = replacement(data, addr, sig) if callable(replacement) else replacement
logger.info("patch[%d]: applying at file offset 0x%08X (%d bytes)",
i, addr, len(repl_bytes))
data[addr:addr + len(repl_bytes)] = repl_bytes
modified = True
if version[0] >= 20:
try:
patch_4func(data)
logger.info("patch_4func: applied")
modified = True
except PatchError as e:
logger.warning("patch_4func: %s (continuing anyway)", e)
if not modified:
raise PatchError(
"No patches applied. Either this version is unsupported, the "
"binary is already patched, or the byte offsets have shifted."
)
try:
shutil.copy(resolve_path, resolve_path + ".bak")
except OSError as e:
raise PatchError(f"Unable to backup Resolve.exe: {e}") from e
# bytearray is bytes-like — pass directly to avoid copying ~640 MB.
_atomic_write_with_retry(resolve_path, data, action="write")
try:
configure_license_file(resolve_path)
except PatchError as e:
# License file is best-effort; the binary patch already succeeded.
logger.warning("configure_license_file failed: %s", e)
def restore(resolve_path: str) -> None:
"""Restore Resolve.exe from <path>.bak. Warns if the .bak's PE version
differs from the current binary's, since restoring across an upgrade
would replace the new install with the old one."""
bak = resolve_path + ".bak"
if not Path(bak).exists():
raise PatchError(f"No backup found at {bak}")
with open(bak, "rb") as f:
bak_data = f.read()
# Cross-version sanity check: same major.minor.micro on both sides. The
# VS_FIXEDFILEINFO signature lives in the resource section near the end
# of the binary, so we have to scan the whole file (transient — released
# after this block).
try:
bak_version = determine_version(bak_data)
except PatchError:
bak_version = None
cur_version = None
try:
with open(resolve_path, "rb") as f:
cur_data = f.read()
cur_version = determine_version(cur_data)
del cur_data
except (OSError, PatchError):
pass
if bak_version and cur_version and bak_version != cur_version:
logger.warning(
"backup is %d.%d.%d but current Resolve.exe is %d.%d.%d — "
"restore would downgrade. Aborting. Delete %s manually if "
"you really want to overwrite.",
*bak_version, *cur_version, bak,
)
raise PatchError("backup version mismatch")
_atomic_write_with_retry(resolve_path, bak_data, action="restore")
logger.info("restored %s from %s", resolve_path, bak)
# --------------------------------------------------------------------- state detection (no writes)
def state_of_resolve(resolve_path: str) -> str:
"""Classify a Resolve.exe install without modifying it.
Returns one of: MISSING (file not present), UNSUPPORTED (version not v20/v21),
UNPATCHED (the gate pattern still matches), PATCHED (no match — likely already
patched, or the pattern shifted in a future build)."""
if not Path(resolve_path).exists():
return "MISSING"
try:
with open(resolve_path, "rb") as f:
data = f.read()
version = determine_version(data)
except (OSError, PatchError):
return "UNSUPPORTED"
try:
patches = _select_patches(version)
except PatchError:
return "UNSUPPORTED"
for sig, _ in patches:
if find_all(data, sig):
return "UNPATCHED"
return "PATCHED"
def state_of_fusion(dll_path: str) -> str:
"""Classify a fusionsystem.dll install without modifying it.
Returns one of: MISSING / UNPATCHED / PATCHED."""
if not Path(dll_path).exists():
return "MISSING"
with open(dll_path, "rb") as f:
data = f.read()
for sig, _ in PATCHES_FUSION:
if find_all(data, sig):
return "UNPATCHED"
return "PATCHED"
# --------------------------------------------------------------------- fusion patch / restore / locate
def patch_fusion(dll_path: str) -> None:
"""Patch fusionsystem.dll in place. Backs up to <path>.bak first if and
only if any patch actually modified the binary. Same atomic-write +
verify-by-readback pattern as the Resolve patch."""
try:
with open(dll_path, "rb") as f:
data = bytearray(f.read())
except OSError as e:
raise PatchError(f"could not read {dll_path}: {e}") from e
modified = False
for i, (sig, replacement) in enumerate(PATCHES_FUSION):
occs = find_all(data, sig)
if not occs:
logger.info("fusion patch[%d]: no match (already patched, "
"or this version's layout differs)", i)
continue
if len(occs) > 1:
logger.warning("fusion patch[%d]: matched %d times — skipping",
i, len(occs))
continue
addr = occs[0]
repl_bytes = replacement(data, addr, sig) if callable(replacement) else replacement
logger.info("fusion patch[%d]: applying at file offset 0x%08X (%d bytes)",
i, addr, len(repl_bytes))
data[addr:addr + len(repl_bytes)] = repl_bytes
modified = True
if not modified:
raise PatchError(
f"No Fusion patches applied to {dll_path}. Either it's already "
"patched, this Fusion version is unsupported, or offsets shifted."
)
try:
shutil.copy(dll_path, dll_path + ".bak")
except OSError as e:
raise PatchError(f"unable to backup {dll_path}: {e}") from e
_atomic_write_with_retry(dll_path, bytes(data), action="write")
def restore_fusion(dll_path: str) -> None:
"""Restore fusionsystem.dll from <path>.bak. Unlike Resolve.exe there is
no embedded VS_FIXEDFILEINFO version on the DLL itself, so we skip the
cross-version sanity check — version is encoded in the install path and
each Fusion version has its own .bak."""
bak = dll_path + ".bak"
if not Path(bak).exists():
raise PatchError(f"No backup found at {bak}")
with open(bak, "rb") as f:
bak_data = f.read()
_atomic_write_with_retry(dll_path, bak_data, action="restore")
logger.info("restored %s from %s", dll_path, bak)
def locate_fusion() -> "list[str]":
"""Return all Fusion fusionsystem.dll paths actually present on disk."""
return [p for p in DEFAULT_FUSION_PATHS if Path(p).exists()]
def _kill_fusion() -> None:
"""Best-effort kill of any running Fusion processes that would lock the
DLL. Silently ignores missing processes."""
for image in ("Fusion.exe", "FusionServer.exe"):
try:
subprocess.run(["taskkill", "/F", "/IM", image],
capture_output=True, text=True)
except OSError:
pass
# --------------------------------------------------------------------- locate
def _path_from_shellopen() -> Optional[str]:
"""Look up Resolve.exe via the registered shell open-command for known
Resolve file types. The registry value looks like:
"C:\\path\\Resolve.exe" "%1"
so we strip the leading quote and trailing ` "%1"` (6 chars)."""
for typ in SHELLOPEN_EXTENSION_KEYS:
try:
with winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
rf"Software\Classes\{typ}\shell\open\command",
) as key:
value, _ = winreg.QueryValueEx(key, "")
except OSError:
continue
path = value[1:-6]
if Path(path).exists():
return path
return None
def locate() -> str:
"""Find Resolve.exe via the registry, falling back to the standard install path."""
path = _path_from_shellopen()
if path is not None:
logger.info("Resolve found via regkey: %s", path)
return path
if Path(DEFAULT_PATH).exists():
return DEFAULT_PATH
raise PatchError("Resolve could not be located.")
# --------------------------------------------------------------------- CLI
def _build_arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
description="Patch DaVinci Resolve.exe and/or Fusion Studio's "
"fusionsystem.dll (v20.x / v21.x). Run as Administrator.",
)
p.add_argument("--restore", action="store_true",
help="restore from .bak files and exit (applies to all selected targets)")
p.add_argument("--path", default=None,
help="explicit path to Resolve.exe (skips auto-locate)")
p.add_argument("--targets", default=None,
help="non-interactive: comma-separated targets (resolve, fusion, all). "
"If omitted on a TTY, an interactive menu is shown instead.")
return p
def _require_admin() -> None:
"""Exit immediately if not running as Administrator on Windows.
On non-Windows, fail fast — the patcher uses winreg/ctypes-windll
further down and would crash anyway."""
if sys.platform != "win32":
logger.error("This script only runs on Windows.")
raise SystemExit(1)
if not ctypes.windll.shell32.IsUserAnAdmin():
logger.error("This script must be run as Administrator.")
logger.error("Right-click your terminal/PowerShell and choose 'Run as administrator'.")
raise SystemExit(1)
def _kill_resolve() -> bool:
"""Kill any running Resolve.exe. Returns True if a process was killed,
False if there was nothing to kill or the kill failed."""
try:
result = subprocess.run(
["taskkill", "/F", "/IM", "Resolve.exe"],
capture_output=True,
text=True,
)
except OSError as e:
logger.debug("taskkill unavailable: %s", e)
return False
if result.returncode == 0:
logger.info("killed running Resolve.exe")
return True
# taskkill returns 128 when no matching process exists — that's fine.
return False
def _parse_targets(spec: str) -> "set[str]":
"""`--targets all` expands to {resolve, fusion}; comma-separated names
are validated against that set. Unknown names raise SystemExit via the
arg parser convention used elsewhere."""
valid = {"resolve", "fusion"}
raw = {t.strip().lower() for t in spec.split(",") if t.strip()}
if "all" in raw:
return valid
bad = raw - valid
if bad:
logger.error("unknown --targets value(s): %s (valid: resolve, fusion, all)",
", ".join(sorted(bad)))
raise SystemExit(2)
return raw
# --------------------------------------------------------------------- interactive menu
# A target row in the menu: (kind, label, path, state).
# kind ∈ {"resolve", "fusion21", "fusion20"} — used by _execute to dispatch.
def _build_target_rows(resolve_path: Optional[str]) -> "list[tuple[str, str, str, str]]":
rows: list[tuple[str, str, str, str]] = []
if resolve_path is not None:
rows.append(("resolve",
"DaVinci Resolve.exe",
resolve_path,
state_of_resolve(resolve_path)))
else:
rows.append(("resolve",
"DaVinci Resolve.exe",
DEFAULT_PATH,
"MISSING"))
for fp in DEFAULT_FUSION_PATHS:
version = "21" if "Fusion 21" in fp else "20"
rows.append((f"fusion{version}",
f"Fusion Studio {version} (fusionsystem.dll)",
fp,
state_of_fusion(fp)))
return rows
def _prompt(question: str) -> str:
"""input() that gracefully handles Ctrl-C / EOF by returning ''."""
try:
return input(question)
except (KeyboardInterrupt, EOFError):
print()
return ""
# ----- arrow-key UI primitives (Windows console via msvcrt + ANSI escapes) -----
def _enable_ansi() -> None:
"""Best-effort: turn on ENABLE_VIRTUAL_TERMINAL_PROCESSING so ANSI escape
codes (cursor up, line clear) are interpreted instead of printed literally.
Modern Windows terminals have this on by default; this is a safety net for
older console hosts."""
try:
STD_OUTPUT_HANDLE = -11
ENABLE_VT = 0x0004
kernel32 = ctypes.windll.kernel32
h = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
mode = ctypes.c_ulong()
if kernel32.GetConsoleMode(h, ctypes.byref(mode)):
kernel32.SetConsoleMode(h, mode.value | ENABLE_VT)
except Exception:
pass
def _read_key() -> str:
"""Block on a single keypress. Returns a normalized token:
'up' / 'down' / 'left' / 'right' / 'enter' / 'space' / 'escape',
or a lowercased single character (e.g. 'a', 'q'), or '' for unknown.
Raises KeyboardInterrupt on Ctrl-C."""
ch = msvcrt.getch()
# Special keys are two-byte sequences: \xe0 (or \x00) followed by a code.
if ch in (b'\xe0', b'\x00'):
ch2 = msvcrt.getch()
return {b'H': 'up', b'P': 'down', b'K': 'left', b'M': 'right'}.get(ch2, '')
if ch in (b'\r', b'\n'):
return 'enter'
if ch == b'\x1b':
return 'escape'
if ch == b' ':
return 'space'
if ch == b'\x03':
raise KeyboardInterrupt
try:
return ch.decode('utf-8', errors='replace').lower()
except UnicodeDecodeError:
return ''
class _RedrawRegion:
"""In-place menu redraw via ANSI cursor-up + line-clear. The first
`render()` just prints; subsequent calls overwrite the previous render
by going up N lines, clearing each, and re-printing.
Multi-line strings (i.e. items in `lines` that contain '\\n') are split
so each visual line gets its own `\\x1b[2K` clear and is counted in
`self.lines`. Without this split, the cursor-up would undercount and
drift downward by one line per redraw — leaving a growing gap above
the menu."""
def __init__(self) -> None:
self.lines: int = 0
def render(self, lines: "list[str]") -> None:
flat: list[str] = []
for line in lines:
flat.extend(line.split('\n'))
if self.lines:
sys.stdout.write(f'\r\x1b[{self.lines}A')
for line in flat:
sys.stdout.write('\x1b[2K' + line + '\n')
self.lines = len(flat)
sys.stdout.flush()
def _arrow_single_select(header: str, options: "list[str]",
footer: str = "") -> Optional[int]:
"""Single-select arrow-key menu. Returns the chosen index, or None
if the user pressed q/escape."""
cursor = 0
region = _RedrawRegion()
while True:
lines = [header, ""]
for i, opt in enumerate(options):
mark = ">" if i == cursor else " "
lines.append(f" {mark} {opt}")
if footer:
lines.extend(["", footer])
region.render(lines)
try:
key = _read_key()
except KeyboardInterrupt:
return None
if key == 'up' and cursor > 0:
cursor -= 1
elif key == 'down' and cursor < len(options) - 1:
cursor += 1
elif key == 'enter':
return cursor
elif key in ('escape', 'q'):
return None
def _arrow_multi_select(header: str, options: "list[str]",
preselected: "Optional[set[int]]" = None,
footer: str = "") -> "Optional[set[int]]":
"""Multi-select arrow-key menu. SPACE toggles the current row, A toggles
all-vs-none, ENTER confirms. Returns set of chosen indices, or None on
cancel (q/escape) or if the user confirms with nothing selected."""
cursor = 0
selected: set[int] = set(preselected) if preselected else set()
region = _RedrawRegion()
while True:
lines = [header, ""]
for i, opt in enumerate(options):
arrow = ">" if i == cursor else " "
box = "[x]" if i in selected else "[ ]"
lines.append(f" {arrow} {box} {opt}")
if footer:
lines.extend(["", footer])
region.render(lines)
try:
key = _read_key()
except KeyboardInterrupt:
return None
if key == 'up' and cursor > 0:
cursor -= 1
elif key == 'down' and cursor < len(options) - 1:
cursor += 1
elif key == 'space':
if cursor in selected:
selected.discard(cursor)
else:
selected.add(cursor)
elif key == 'a':
selected = set() if len(selected) == len(options) else set(range(len(options)))
elif key == 'enter':
return selected if selected else None
elif key in ('escape', 'q'):
return None
# ----- the menu itself --------------------------------------------------------
def interactive_menu(resolve_path: Optional[str],
action_filter: Optional[str] = None
) -> "tuple[Optional[str], list[tuple[str, str]]]":
"""Arrow-key menu. If `action_filter` is None, the user picks patch vs
restore first. If 'patch' or 'restore' is passed, that step is skipped
(used by the smart `--restore` path).
For 'patch': lists all installed targets; UNPATCHED rows are pre-checked.
For 'restore': lists only PATCHED targets; all are pre-checked.
Returns (action, [(kind, path), ...]) or (None, []) on cancel."""
rows = _build_target_rows(resolve_path)
print()
print("=" * 72)
print(" Blackmagic Design Patcher - DaVinci Resolve & Fusion")
print("=" * 72)
if action_filter is None:
idx = _arrow_single_select(
header="\nWhat do you want to do?",
options=["Patch installed targets",
"Restore from .bak",
"Quit"],
footer="(Up/Down to move, Enter to confirm, q/Esc to cancel)",
)
if idx is None or idx == 2:
return None, []
action = "patch" if idx == 0 else "restore"
else:
action = action_filter
if action == "patch":
relevant = [r for r in rows if r[3] in ("UNPATCHED", "PATCHED", "UNSUPPORTED")]
preselect = {i for i, r in enumerate(relevant) if r[3] == "UNPATCHED"}
else: # restore
relevant = [r for r in rows if r[3] == "PATCHED"]
preselect = set(range(len(relevant)))
if not relevant:
if action == "patch":
print("\nNo installed targets to patch.\n")
else:
print("\nNothing to restore (no targets are currently patched).\n")
return None, []
option_lines = [f"{label:42s} {state}" for _kind, label, _path, state in relevant]
verb = action.upper()
chosen_idx = _arrow_multi_select(
header=f"\nSelect targets to {verb}:",
options=option_lines,
preselected=preselect,
footer="(Up/Down move, Space toggle, A all/none, Enter confirm, q/Esc cancel)",
)
if not chosen_idx:
print("\nCancelled (no targets selected).\n")
return None, []
print()
print(f"About to {verb}:")
for i in sorted(chosen_idx):
_kind, label, _path, state = relevant[i]
note = ""
if action == "patch" and state == "PATCHED":
note = " (already patched - will be a no-op)"
elif action == "restore" and state != "PATCHED":
note = f" (current state: {state})"
print(f" - {label}{note}")
print()
confirm_idx = _arrow_single_select(
header="Continue?",
options=["Yes, proceed", "No, cancel"],
footer="(Up/Down + Enter, or q/Esc to cancel)",
)
if confirm_idx != 0:
print("\nCancelled.\n")
return None, []
chosen = [(relevant[i][0], relevant[i][2]) for i in sorted(chosen_idx)]
return action, chosen
def _execute(action: str, chosen: "list[tuple[str, str]]") -> int:
"""Run `action` ('patch'|'restore') against the chosen [(kind, path), ...].
Each kill is deferred until just before its first relevant work — so we
never kill Fusion if only Resolve was selected, and vice versa."""
rc = 0
resolve_picked = [(k, p) for k, p in chosen if k == "resolve"]
fusion_picked = [(k, p) for k, p in chosen if k.startswith("fusion")]
if resolve_picked:
_kill_resolve()
for _kind, path in resolve_picked:
try:
if action == "restore":
logger.info("attempting to restore Resolve.exe!")
restore(path)
logger.info("successfully restored Resolve.exe")
else:
logger.info("attempting to patch Resolve.exe!")
patch(path)
logger.info("successfully patched Resolve.exe")
except PatchError as e:
logger.error("Resolve failed: %s", e)
rc = 1
if fusion_picked:
_kill_fusion()
for _kind, path in fusion_picked:
try:
if action == "restore":
logger.info("attempting to restore: %s", path)
restore_fusion(path)
logger.info("successfully restored: %s", path)
else:
logger.info("attempting to patch: %s", path)
patch_fusion(path)
logger.info("successfully patched: %s", path)
except PatchError as e:
logger.error("Fusion failed (%s): %s", path, e)
rc = 1
return rc
def _resolve_chosen_from_cli(targets_spec: str, resolve_path: Optional[str]) -> "list[tuple[str, str]]":
"""Translate a `--targets` string into the `chosen` list shape used by
_execute. Skips MISSING installs silently — same behaviour as the menu."""
targets = _parse_targets(targets_spec)
chosen: list[tuple[str, str]] = []
if "resolve" in targets and resolve_path is not None:
chosen.append(("resolve", resolve_path))
if "fusion" in targets:
for fp in locate_fusion():
kind = "fusion21" if "Fusion 21" in fp else "fusion20"
chosen.append((kind, fp))
return chosen
def main() -> int: