From d5948acf39eecf7fbf22e8f0146853bc0e7ec2cb Mon Sep 17 00:00:00 2001 From: NianJiuZst <3235467914@qq.com> Date: Sun, 10 May 2026 16:38:12 +0800 Subject: [PATCH 1/2] fix: preserve existing downloads on failure --- src/files/download.ts | 30 +++++++++------ test/files/download.test.ts | 74 +++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 11 deletions(-) create mode 100644 test/files/download.test.ts diff --git a/src/files/download.ts b/src/files/download.ts index 5814023..e34be1f 100644 --- a/src/files/download.ts +++ b/src/files/download.ts @@ -1,4 +1,4 @@ -import { createWriteStream, unlinkSync } from 'fs'; +import { createWriteStream, renameSync, unlinkSync } from 'fs'; import { createProgressBar } from '../output/progress'; import { CLIError } from '../errors/base'; import { ExitCode } from '../errors/codes'; @@ -41,12 +41,14 @@ export async function downloadFile( const reader = res.body?.getReader(); if (!reader) throw new CLIError('No response body', ExitCode.GENERAL); - const writer = createWriteStream(destPath); + const tmpPath = `${destPath}.tmp-${process.pid}-${Date.now()}-${attempt}-${Math.random().toString(36).slice(2)}`; + const writer = createWriteStream(tmpPath); const progress = contentLength > 0 && !opts?.quiet ? createProgressBar(contentLength, 'Downloading') : null; let received = 0; + let readComplete = false; let completed = false; try { @@ -67,22 +69,28 @@ export async function downloadFile( received += value.byteLength; progress?.update(received); } - completed = true; + readComplete = true; } finally { reader.releaseLock(); progress?.finish(); - await new Promise((resolve, reject) => { - writer.on('finish', resolve); - writer.on('error', reject); - writer.end(); - }); - - if (!completed) { - try { unlinkSync(destPath); } catch { /* best effort */ } + try { + if (!writer.destroyed) { + await new Promise((resolve, reject) => { + writer.on('finish', resolve); + writer.on('error', reject); + writer.end(); + }); + completed = readComplete; + } + } finally { + if (!completed) { + try { unlinkSync(tmpPath); } catch { /* best effort */ } + } } } + renameSync(tmpPath, destPath); return { size: received }; } catch (err) { lastError = err instanceof Error ? err : new Error(String(err)); diff --git a/test/files/download.test.ts b/test/files/download.test.ts new file mode 100644 index 0000000..785afe8 --- /dev/null +++ b/test/files/download.test.ts @@ -0,0 +1,74 @@ +import { afterEach, describe, expect, it } from 'bun:test'; +import { existsSync, mkdtempSync, readdirSync, readFileSync, rmSync, writeFileSync } from 'fs'; +import { tmpdir } from 'os'; +import { join } from 'path'; +import { downloadFile } from '../../src/files/download'; + +const originalFetch = globalThis.fetch; +const tempDirs: string[] = []; + +function makeTempDir(): string { + const dir = mkdtempSync(join(tmpdir(), 'mmx-download-test-')); + tempDirs.push(dir); + return dir; +} + +afterEach(() => { + globalThis.fetch = originalFetch; + for (const dir of tempDirs.splice(0)) { + rmSync(dir, { recursive: true, force: true }); + } +}); + +describe('downloadFile', () => { + it('keeps an existing destination intact when the download stream fails', async () => { + const dir = makeTempDir(); + const destPath = join(dir, 'video.mp4'); + writeFileSync(destPath, 'original'); + + globalThis.fetch = (async () => { + let sentChunk = false; + const stream = new ReadableStream({ + pull(controller) { + if (!sentChunk) { + sentChunk = true; + controller.enqueue(new TextEncoder().encode('partial')); + return; + } + controller.error(new Error('stream failed')); + }, + }); + + return new Response(stream, { + status: 200, + headers: { 'content-length': '100' }, + }); + }) as unknown as typeof fetch; + + await expect( + downloadFile('https://example.com/video.mp4', destPath, { quiet: true, retries: 0 }), + ).rejects.toThrow('Download failed'); + + expect(readFileSync(destPath, 'utf-8')).toBe('original'); + expect(readdirSync(dir)).toEqual(['video.mp4']); + }); + + it('replaces the destination only after a successful download', async () => { + const dir = makeTempDir(); + const destPath = join(dir, 'video.mp4'); + writeFileSync(destPath, 'original'); + + globalThis.fetch = (async () => new Response(new TextEncoder().encode('new'), { + status: 200, + headers: { 'content-length': '3' }, + })) as unknown as typeof fetch; + + await expect( + downloadFile('https://example.com/video.mp4', destPath, { quiet: true, retries: 0 }), + ).resolves.toEqual({ size: 3 }); + + expect(readFileSync(destPath, 'utf-8')).toBe('new'); + expect(existsSync(destPath)).toBe(true); + expect(readdirSync(dir)).toEqual(['video.mp4']); + }); +}); From 9cc128962e9866012a8929b1792d0065b0fff344 Mon Sep 17 00:00:00 2001 From: NianJiuZst <3235467914@qq.com> Date: Sun, 10 May 2026 16:39:31 +0800 Subject: [PATCH 2/2] fix: keep stream json output valid --- src/commands/text/chat.ts | 16 +++-- test/commands/text/chat.test.ts | 115 +++++++++++++++++++++++++++++++- 2 files changed, 125 insertions(+), 6 deletions(-) diff --git a/src/commands/text/chat.ts b/src/commands/text/chat.ts index 1a79c2b..9d6184b 100644 --- a/src/commands/text/chat.ts +++ b/src/commands/text/chat.ts @@ -127,8 +127,12 @@ export default defineCommand({ const model = (flags.model as string) || config.defaultTextModel || 'MiniMax-M2.7'; - const shouldStream = flags.stream === true || (flags.stream === undefined && process.stdout.isTTY); const format = detectOutputFormat(config.output); + const shouldStream = flags.stream === true || ( + flags.stream === undefined + && format !== 'json' + && process.stdout.isTTY + ); const body: ChatRequest = { model, @@ -190,11 +194,12 @@ export default defineCommand({ let inThinking = false; const dim = config.noColor ? '' : '\x1b[2m'; const reset = config.noColor ? '' : '\x1b[0m'; + const isJsonOutput = format === 'json'; const isTTY = process.stdout.isTTY; // In TTY mode, write thinking/response headers to stdout for display. // In non-TTY (pipe/agent) mode, write everything but final text to stderr. - const statusOut = isTTY ? process.stdout : process.stderr; - const resultOut = process.stdout; + const statusOut = isTTY && !isJsonOutput ? process.stdout : process.stderr; + const resultOut = isJsonOutput ? undefined : process.stdout; for await (const event of parseSSE(res)) { if (event.data === '[DONE]') break; @@ -212,7 +217,7 @@ export default defineCommand({ } else if (parsed.type === 'content_block_delta') { if (parsed.delta.type === 'text_delta') { textContent += parsed.delta.text; - resultOut.write(parsed.delta.text); + resultOut?.write(parsed.delta.text); } else if (parsed.delta.type === 'thinking_delta') { statusOut.write(parsed.delta.thinking); } @@ -223,10 +228,11 @@ export default defineCommand({ } } if (inThinking) statusOut.write(reset); - resultOut.write('\n'); if (format === 'json') { console.log(formatOutput({ content: textContent }, format)); + } else { + resultOut?.write('\n'); } } else { const response = await requestJson(config, { diff --git a/test/commands/text/chat.test.ts b/test/commands/text/chat.test.ts index 8748f9b..37ad015 100644 --- a/test/commands/text/chat.test.ts +++ b/test/commands/text/chat.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect, afterEach } from 'bun:test'; -import { createMockServer, jsonResponse, type MockServer } from '../../helpers/mock-server'; +import { createMockServer, jsonResponse, sseResponse, type MockServer } from '../../helpers/mock-server'; import textChatResponse from '../../fixtures/text-chat-response.json'; import type { Config } from '../../../src/config/schema'; @@ -146,6 +146,119 @@ describe('text chat command', () => { } }); + it('does not enable default streaming for json output', async () => { + let requestBody: { stream?: boolean } | undefined; + server = createMockServer({ + routes: { + '/anthropic/v1/messages': async (req) => { + requestBody = await req.json() as { stream?: boolean }; + return jsonResponse(textChatResponse); + }, + }, + }); + + const { default: chatCommand } = await import('../../../src/commands/text/chat'); + + const config: Config = { + apiKey: 'test-key', + region: 'global' as const, + baseUrl: server.url, + output: 'json', + timeout: 10, + verbose: false, + quiet: false, + noColor: true, + yes: false, + dryRun: false, + nonInteractive: true, + async: false, + }; + + const originalIsTTY = process.stdout.isTTY; + const originalLog = console.log; + let output = ''; + Object.defineProperty(process.stdout, 'isTTY', { value: true, configurable: true }); + console.log = (msg: string) => { output += msg; }; + + try { + await chatCommand.execute(config, { + message: ['Hello'], + quiet: false, + verbose: false, + noColor: true, + yes: false, + dryRun: false, + help: false, + nonInteractive: true, + async: false, + }); + + expect(requestBody?.stream).toBe(false); + expect(JSON.parse(output).content[0].text).toBe('Hello! How can I help you today?'); + } finally { + console.log = originalLog; + Object.defineProperty(process.stdout, 'isTTY', { value: originalIsTTY, configurable: true }); + } + }); + + it('emits only final json to stdout for explicit stream json output', async () => { + server = createMockServer({ + routes: { + '/anthropic/v1/messages': () => sseResponse([ + { data: JSON.stringify({ type: 'content_block_delta', delta: { type: 'text_delta', text: 'Hello' } }) }, + { data: JSON.stringify({ type: 'content_block_delta', delta: { type: 'text_delta', text: ' world' } }) }, + ]), + }, + }); + + const { default: chatCommand } = await import('../../../src/commands/text/chat'); + + const config: Config = { + apiKey: 'test-key', + region: 'global' as const, + baseUrl: server.url, + output: 'json', + timeout: 10, + verbose: false, + quiet: false, + noColor: true, + yes: false, + dryRun: false, + nonInteractive: true, + async: false, + }; + + const originalLog = console.log; + const originalWrite = process.stdout.write; + let output = ''; + console.log = (msg: string) => { output += `${msg}\n`; }; + process.stdout.write = ((chunk: string | Uint8Array) => { + output += typeof chunk === 'string' ? chunk : Buffer.from(chunk).toString('utf-8'); + return true; + }) as typeof process.stdout.write; + + try { + await chatCommand.execute(config, { + message: ['Hello'], + stream: true, + quiet: false, + verbose: false, + noColor: true, + yes: false, + dryRun: false, + help: false, + nonInteractive: true, + async: false, + }); + + expect(output).toBe('{\n "content": "Hello world"\n}\n'); + expect(JSON.parse(output).content).toBe('Hello world'); + } finally { + console.log = originalLog; + process.stdout.write = originalWrite; + } + }); + it('--model flag overrides defaultTextModel', async () => { const { default: chatCommand } = await import('../../../src/commands/text/chat');