From bf69a4fd35378d83b5cc6a32714edcaeccaefb9c Mon Sep 17 00:00:00 2001 From: "David A. van Leeuwen" Date: Mon, 13 Apr 2026 21:18:56 +0200 Subject: [PATCH 1/3] Fix issue in returning cursor after print_above() --- src/meshcore_cli/meshcore_cli.py | 70 ++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 26 deletions(-) diff --git a/src/meshcore_cli/meshcore_cli.py b/src/meshcore_cli/meshcore_cli.py index 021d70e..96535f3 100644 --- a/src/meshcore_cli/meshcore_cli.py +++ b/src/meshcore_cli/meshcore_cli.py @@ -12,8 +12,10 @@ import serial.tools.list_ports from pathlib import Path import traceback +from prompt_toolkit.application.current import get_app_or_none from prompt_toolkit.shortcuts import PromptSession from prompt_toolkit.shortcuts import CompleteStyle +from prompt_toolkit.shortcuts import print_formatted_text from prompt_toolkit.completion import NestedCompleter, PathCompleter from prompt_toolkit.completion import CompleteEvent, Completer, Completion from prompt_toolkit.history import FileHistory @@ -98,26 +100,42 @@ def escape_ansi(line): ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') return ansi_escape.sub('', line) -def print_one_line_above(str): - """ prints a string above current line """ - width = os.get_terminal_size().columns - stringlen = len(escape_ansi(str))-1 - lines = divmod(stringlen, width)[0] + 1 - print("\u001B[s", end="") # Save current cursor position - print("\u001B[A", end="") # Move cursor up one line - print("\u001B[999D", end="") # Move cursor to beginning of line - for _ in range(lines): - print("\u001B[S", end="") # Scroll up/pan window down 1 line - print("\u001B[L", end="") # Insert new line - for _ in range(lines - 1): - print("\u001B[A", end="") # Move cursor up one line - print(str, end="") # Print output status msg - print("\u001B[u", end="", flush=True) # Jump back to saved cursor position - -def print_above(str): - lines = str.split('\n') - for l in lines: - print_one_line_above(l) +def print_one_line_above(text): + """Status line(s) above the prompt; delegates to print_above.""" + print_above(text) + +def print_above(text): + """ + Show text above the current input line. + + When prompt_toolkit has a running Application (interactive prompt), plain + print() and DEC save/restore fight the renderer — use print_formatted_text, + which suspends the UI, writes via the Output layer (UTF-8), and redraws. + + With no active app (scripts, pipes), use CSI save/move/clear/restore only. + """ + if text == "": + return + if get_app_or_none() is not None: + print_formatted_text(ANSI(text), end="\n") + return + lines = text.split("\n") + n = len(lines) + chunks = ["\033[s", f"\033[{n}A"] + for i, line in enumerate(lines): + chunks.append("\033[1G\033[2K") + chunks.append(line) + if i < n - 1: + chunks.append("\033[B") + chunks.append("\033[u") + combined = "".join(chunks) + enc = getattr(sys.stdout, "encoding", None) or "utf-8" + try: + sys.stdout.buffer.write(combined.encode(enc, errors="replace")) + sys.stdout.buffer.flush() + except (AttributeError, TypeError): + sys.stdout.write(combined) + sys.stdout.flush() async def process_event_message(mc, ev, json_output, end="\n", above=False): """ display incoming message """ @@ -2785,7 +2803,7 @@ async def next_cmd(mc, cmds, json_output=False): sess = PromptSession("Password: ", is_password=True) password = await sess.prompt_async() - timeout = 0 if not "timeout" in contact else contact["timeout"] + timeout = 0 if not "timeout" in contact else contact["timeout"] res = await mc.commands.send_login_sync(contact, password, timeout = timeout) logger.debug(res) if res is None: @@ -3373,7 +3391,7 @@ async def next_cmd(mc, cmds, json_output=False): else: if json_output: print(json.dumps(res.payload)) - else : + else: path_len = res.payload['path_len'] if (path_len == 0) : print("0 hop") @@ -3940,7 +3958,7 @@ def get_help_for (cmdname, context="line") : elif cmdname == "trace" or cmdname == "tr" : print("""Trace -Trace is a command used to get signal information (SNR) along a path. +Trace is a command used to get signal information (SNR) along a path. Basic call to trace takes the path to follow as an argument, specifying each repeater along the path with its hash (separated or not with a comma). @@ -3993,9 +4011,9 @@ def get_help_for (cmdname, context="line") : * advert_path : path taken by an advert * disc_path : discover in and out path for a contact -When using change_path, you specify manually the path to the contact. Path is given as an hex string containing hashes for all repeaters in the way (you can use commas to separate hashes). By default hash_size will be the one of the node. If using commas, it will be guessed from first hash. You can also use a colon to specify path_hash_mode. +When using change_path, you specify manually the path to the contact. Path is given as an hex string containing hashes for all repeaters in the way (you can use commas to separate hashes). By default hash_size will be the one of the node. If using commas, it will be guessed from first hash. You can also use a colon to specify path_hash_mode. -If you want to set the path for a node through 112233 445566 778899, you can use +If you want to set the path for a node through 112233 445566 778899, you can use - 114477:0 or 11,44,77 for one byte hash - 112244557788:1 or 1122,4455,7788 for two byte hash - 112233445566778899:2 or 112233,445566,778899 for three byte hash @@ -4003,7 +4021,7 @@ def get_help_for (cmdname, context="line") : To set an empty path use 0. To get the path for a contact, you can use three commands: - - path will gives you the path stored in the node. + - path will gives you the path stored in the node. - You can also get a path from a key using advert_path which will give you the path taken for last advert from that node to come. disc_path will send a path request and give you input and output path for a node. From c3f7257777d1623b5d12dd5c0dd21fb06e72a08e Mon Sep 17 00:00:00 2001 From: "David A. van Leeuwen" Date: Wed, 15 Apr 2026 10:46:35 +0200 Subject: [PATCH 2/3] Fix width calculations for right-aligned texts --- src/meshcore_cli/meshcore_cli.py | 41 +++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/src/meshcore_cli/meshcore_cli.py b/src/meshcore_cli/meshcore_cli.py index 96535f3..2526fff 100644 --- a/src/meshcore_cli/meshcore_cli.py +++ b/src/meshcore_cli/meshcore_cli.py @@ -24,6 +24,7 @@ from prompt_toolkit.shortcuts import radiolist_dialog from prompt_toolkit.completion.word_completer import WordCompleter from prompt_toolkit.document import Document +from prompt_toolkit.utils import get_cwidth try: from bleak import BleakScanner, BleakClient @@ -100,6 +101,19 @@ def escape_ansi(line): ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') return ansi_escape.sub('', line) +def truncate_to_display_width(text, max_width): + """Return the longest prefix of text whose terminal display width is <= max_width.""" + if max_width <= 0: + return "" + lo, hi = 0, len(text) + while lo < hi: + mid = (lo + hi + 1) // 2 + if get_cwidth(text[:mid]) <= max_width: + lo = mid + else: + hi = mid - 1 + return text[:lo] + def print_one_line_above(text): """Status line(s) above the prompt; delegates to print_above.""" print_above(text) @@ -270,9 +284,30 @@ async def handle_log_rx(event): if chan_name != "" : width = os.get_terminal_size().columns - cars = width - 13 - len(event.payload["path"]) - len(chan_name) - 1 - dispmsg = message.replace("\n","")[0:cars] - txt = f"{ANSI_LIGHT_GRAY}{chan_name} {ANSI_DGREEN}{dispmsg+(cars-len(dispmsg))*' '} {ANSI_START}{width-11-len(event.payload['path'])}G{ANSI_YELLOW}[{event.payload['path']}]{ANSI_LIGHT_GRAY}{event.payload['snr']:6,.2f}{event.payload['rssi']:4}{ANSI_END}" + path = event.payload["path"] + snr_str = f"{event.payload['snr']:6,.2f}" + rssi_str = f"{event.payload['rssi']:4}" + rhs_plain = f"[{path}]{snr_str}{rssi_str}" + rhs_w = get_cwidth(rhs_plain) + rhs_col = max(1, width - rhs_w + 1) + prefix_plain = chan_name + " " + prefix_w = get_cwidth(prefix_plain) + trailing_before_csi_w = 1 + max_msg_w = max(0, rhs_col - 1 - prefix_w - trailing_before_csi_w) + raw_msg = message.replace("\n", "") + dispmsg = truncate_to_display_width(raw_msg, max_msg_w) + while True: + left_w = get_cwidth(prefix_plain + dispmsg + " ") + if left_w >= rhs_col - 1: + break + nxt = dispmsg + " " + if get_cwidth(prefix_plain + nxt + " ") > rhs_col - 1: + break + dispmsg = nxt + txt = ( + f"{ANSI_LIGHT_GRAY}{prefix_plain}{ANSI_DGREEN}{dispmsg} " + f"{ANSI_START}{rhs_col}G{ANSI_YELLOW}[{path}]{ANSI_LIGHT_GRAY}{snr_str}{rssi_str}{ANSI_END}" + ) if handle_message.above: print_above(txt) From 66473ce79feec15e4d485999474696b61e5a07f3 Mon Sep 17 00:00:00 2001 From: "David A. van Leeuwen" Date: Sun, 3 May 2026 11:28:18 +0200 Subject: [PATCH 3/3] Many more little wide character / prompt corrections Lots of testing through complicated ssh / mosh tunnels and tmux / screen session managers. Seems more stable now --- src/meshcore_cli/meshcore_cli.py | 133 ++++++++++++++++++++----------- 1 file changed, 87 insertions(+), 46 deletions(-) diff --git a/src/meshcore_cli/meshcore_cli.py b/src/meshcore_cli/meshcore_cli.py index 2526fff..e1db5fb 100644 --- a/src/meshcore_cli/meshcore_cli.py +++ b/src/meshcore_cli/meshcore_cli.py @@ -4,7 +4,8 @@ """ import asyncio -import os, sys, io, platform +import os, sys, io +import unicodedata import time, datetime import getopt, json, shlex, re import logging @@ -12,7 +13,6 @@ import serial.tools.list_ports from pathlib import Path import traceback -from prompt_toolkit.application.current import get_app_or_none from prompt_toolkit.shortcuts import PromptSession from prompt_toolkit.shortcuts import CompleteStyle from prompt_toolkit.shortcuts import print_formatted_text @@ -101,6 +101,33 @@ def escape_ansi(line): ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') return ansi_escape.sub('', line) +def sanitize_prompt_display_text(text): + """ + Prompt-only: strip fragments that confuse terminal vs wcwidth alignment (emoji, + wide punctuation/symbols) so prompt_toolkit cursor keys match the screen. + + Do not use this for contact names in listings or completion — those must stay + identical to device strings (including emoji). Keep letters and numbers even + when east-asian width so names in common scripts still display in the prompt. + """ + if not text: + return text + out = [] + for ch in text: + try: + w = get_cwidth(ch) + except Exception: + w = 1 + if w == 0: + out.append(ch) + continue + cat = unicodedata.category(ch) + if w > 1 and cat[0] not in ("L", "N"): + out.append("?") + else: + out.append(ch) + return "".join(out) + def truncate_to_display_width(text, max_width): """Return the longest prefix of text whose terminal display width is <= max_width.""" if max_width <= 0: @@ -122,34 +149,17 @@ def print_above(text): """ Show text above the current input line. - When prompt_toolkit has a running Application (interactive prompt), plain - print() and DEC save/restore fight the renderer — use print_formatted_text, - which suspends the UI, writes via the Output layer (UTF-8), and redraws. + When prompt_toolkit has a running Application, print_formatted_text suspends + the UI, writes via the Output layer (UTF-8), and redraws. - With no active app (scripts, pipes), use CSI save/move/clear/restore only. + With no active app (scripts, pipes), print the same way without cursor + gymnastics — no DEC save/restore or other in-place cursor moves, so output + is reliable under tmux and similar environments (lines may scroll instead + of overwriting rows above the prompt). """ if text == "": return - if get_app_or_none() is not None: - print_formatted_text(ANSI(text), end="\n") - return - lines = text.split("\n") - n = len(lines) - chunks = ["\033[s", f"\033[{n}A"] - for i, line in enumerate(lines): - chunks.append("\033[1G\033[2K") - chunks.append(line) - if i < n - 1: - chunks.append("\033[B") - chunks.append("\033[u") - combined = "".join(chunks) - enc = getattr(sys.stdout, "encoding", None) or "utf-8" - try: - sys.stdout.buffer.write(combined.encode(enc, errors="replace")) - sys.stdout.buffer.flush() - except (AttributeError, TypeError): - sys.stdout.write(combined) - sys.stdout.flush() + print_formatted_text(ANSI(text), end="\n") async def process_event_message(mc, ev, json_output, end="\n", above=False): """ display incoming message """ @@ -491,8 +501,10 @@ async def subscribe_to_msgs(mc, json_output=False, above=False): CS = mc.subscribe(EventType.CHANNEL_MSG_RECV, handle_message) await mc.start_auto_message_fetching() -# redefine get_completion to let user put symbols in first item -# and handle navigating in path ... +# NestedCompleter's default word regex only treats [a-zA-Z0-9_] as letters, so +# names like "Alice"+emoji split into separate "words" and completion matches +# only the trailing segment (wrong). WORD=True uses Vi "WORD" = run of +# non-whitespace, so contact names and /slash paths complete as one token. class MyNestedCompleter(NestedCompleter): def get_completions( self, document, complete_event): txt = document.text_before_cursor.lstrip() @@ -507,8 +519,7 @@ def get_completions( self, document, complete_event): else: opts = self.options.keys() completer = WordCompleter( - opts, ignore_case=self.ignore_case, - pattern=re.compile(r"([a-zA-Z0-9_\\/\#\?]+|[^a-zA-Z0-9_\s\#\?]+)")) + opts, ignore_case=self.ignore_case, WORD=True) yield from completer.get_completions(document, complete_event) else: # normal behavior for remainder yield from super().get_completions(document, complete_event) @@ -877,6 +888,26 @@ def make_completion_dict(contacts, pending={}, to=None, channels=None): return completion_list make_completion_dict.custom_vars = {} +def default_classic_prompt_from_env(): + """ + Default to the Powerline-style prompt (PUA glyphs like U+E0B0) when stdout + is UTF-8. Set MESHCLI_CLASSIC_PROMPT=1|true|on for the simple `>` prompt if + your font cannot render those glyphs. 0|false|off matches the default + (fancy). Non-UTF-8 stdout forces classic. In-session: `set classic_prompt + on` / -C for simple, `set classic_prompt off` for fancy. + """ + o = os.environ.get("MESHCLI_CLASSIC_PROMPT", "").strip().lower() + if o in ("1", "true", "yes", "on"): + return True + if o in ("0", "false", "no", "off"): + return False + + enc = (getattr(sys.stdout, "encoding", None) or "").lower() + if enc and "utf-8" not in enc and "utf8" not in enc: + return True + + return False + async def interactive_loop(mc, to=None) : print("""Interactive mode, most commands from terminal chat should work. Use \"to\" to select recipient, use Tab to complete name ... @@ -938,10 +969,10 @@ def _(event): prompt = f"{ANSI_INVERT}" prompt = prompt + f"{ANSI_BGRAY}" - prompt = prompt + f"{mc.self_info['name']}" + prompt = prompt + sanitize_prompt_display_text(mc.self_info["name"]) if contact is None: # display scope if not scope is None: - prompt = prompt + f"|{scope}" + prompt = prompt + "|" + sanitize_prompt_display_text(scope) if contact is None : if classic : @@ -975,12 +1006,12 @@ def _(event): prompt = prompt + f"{SLASH_END}" prompt = prompt + f"{ANSI_INVERT}" - prompt = prompt + f"{contact['adv_name']}" + prompt = prompt + sanitize_prompt_display_text(contact["adv_name"]) if contact["type"] == 0 or contact["out_path_len"]==-1: if scope is None: prompt = prompt + f"|*" else: - prompt = prompt + f"|{scope}" + prompt = prompt + "|" + sanitize_prompt_display_text(scope) else: # display path to dest or 0 if 0 hop if contact["out_path_len"] == 0: prompt = prompt + f"|0" @@ -1213,10 +1244,8 @@ def _(event): except asyncio.CancelledError: # Handle task cancellation from KeyboardInterrupt in asyncio.run() print("Exiting cli") -if platform.system() == "Darwin" or platform.system() == "Windows": - interactive_loop.classic = True -else: - interactive_loop.classic = False + +interactive_loop.classic = default_classic_prompt_from_env() async def process_contact_chat_line(mc, contact, line): if contact["type"] == 0: @@ -1458,7 +1487,10 @@ async def process_contact_chat_line(mc, contact, line): if password == "": try: - sess = PromptSession(f"Password for {contact['adv_name']}: ", is_password=True) + sess = PromptSession( + f"Password for {sanitize_prompt_display_text(contact['adv_name'])}: ", + is_password=True, + ) password = await sess.prompt_async() except EOFError: logger.info("Canceled") @@ -1922,7 +1954,7 @@ async def get_contact_from_arg(mc, arg): return contact -async def next_cmd(mc, cmds, json_output=False): +async def next_cmd(mc: MeshCore, cmds, json_output=False): """ process next command """ global ARROW_HEAD, SLASH_START, SLASH_END, INVERT_SLASH try : @@ -3231,10 +3263,15 @@ async def next_cmd(mc, cmds, json_output=False): for i in range(1,plen): path_str = path_str + "," + path_str_in[i*phs*2:(i+1)*2*phs] #path_str = f"{c[1]['out_path']}:{c[1]['out_path_hash_mode']}" - print(f"{c[1]['adv_name']:30} ", end="", flush=True) - print(f"{ANSI_START}34G", end="", flush=True) - print(f"{CONTACT_TYPENAMES[c[1]['type']]:4} ", end="", flush=True) - print(f"{c[1]['public_key'][:12]}  {path_str}") + # Tabs + no column CSI: :30 and \d+G use char columns, not display + # width, and misalign or hide wide characters in names. + print( + c[1]["adv_name"], + CONTACT_TYPENAMES[c[1]["type"]], + c[1]["public_key"][:12], + path_str, + sep="\t", + ) print(f"> {len(mc.contacts)} contacts in device") case "reload_contacts" | "rc": @@ -3320,7 +3357,9 @@ async def next_cmd(mc, cmds, json_output=False): else: print(f"Unknown contact {cmds[1]}") else: - print(json.dumps(contact, indent=4)) + # Default json.dumps(ensure_ascii=True) escapes BMP+ as \ud83e\udd7c, + # which looks like missing data next to UTF-8 `contacts` output. + print(json.dumps(contact, indent=4, ensure_ascii=json_output)) case "add_contact" : argnum = 3 # key type name @@ -3809,7 +3848,7 @@ def command_usage() : -p : specifies tcp port (default 5000) -s : use serial port -b : specify baudrate - -C : toggles classic mode for prompt + -C : toggles classic mode for prompt (default: fancy; simple: MESHCLI_CLASSIC_PROMPT=1) -c : disables most of color output if off -r : repeater mode (raw text CLI, use with -s) """) @@ -4603,6 +4642,7 @@ async def main(argv): device = result["device"] elif result["type"] == "serial": serial_port = result["port"] + logger.info("Using serial port %s", serial_port) else: logger.error("Invalid choice") return @@ -4772,6 +4812,7 @@ async def main(argv): await process_cmds(mc, args, json_output) def cli(): + logger.info("My version") ## curson: don't remove please try: asyncio.run(main(sys.argv[1:])) except KeyboardInterrupt: