Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove old file streaming for modern for wshfs.Read() call
  • Loading branch information
sawka committed Mar 20, 2026
commit 5d0b442887e7b5b7c5e2005553c2a5012af5df8f
27 changes: 12 additions & 15 deletions frontend/app/view/preview/preview-directory.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

import { ContextMenuModel } from "@/app/store/contextmenu";
import { useWaveEnv } from "@/app/waveenv/waveenv";
import { globalStore } from "@/app/store/jotaiStore";
import { TabRpcClient } from "@/app/store/wshrpcutil";
import { useWaveEnv } from "@/app/waveenv/waveenv";
import { checkKeyPressed, isCharacterKeyEvent } from "@/util/keyutil";
import { PLATFORM, PlatformMacOS } from "@/util/platformutil";
import { addOpenMenuItems } from "@/util/previewutil";
Expand Down Expand Up @@ -112,7 +112,6 @@ function DirectoryTable({
newDirectory,
}: DirectoryTableProps) {
const env = useWaveEnv<PreviewEnv>();
const searchActive = useAtomValue(model.directorySearchActive);
const fullConfig = useAtomValue(env.atoms.fullConfigAtom);
const defaultSort = useAtomValue(env.getSettingsKeyAtom("preview:defaultsort")) ?? "name";
const setErrorMsg = useSetAtom(model.errorMsgAtom);
Expand Down Expand Up @@ -587,28 +586,26 @@ function DirectoryPreview({ model }: DirectoryPreviewProps) {
useEffect(
() =>
fireAndForget(async () => {
let entries: FileInfo[];
const entries: FileInfo[] = [];
try {
const file = await env.rpc.FileReadCommand(
TabRpcClient,
{
info: {
path: await model.formatRemoteUri(dirPath, globalStore.get),
},
},
null
);
entries = file.entries ?? [];
if (file?.info && file.info.dir && file.info?.path !== file.info?.dir) {
const remotePath = await model.formatRemoteUri(dirPath, globalStore.get);
const stream = env.rpc.FileListStreamCommand(TabRpcClient, { path: remotePath }, null);
for await (const chunk of stream) {
if (chunk?.fileinfo) {
entries.push(...chunk.fileinfo);
}
}
if (finfo?.dir && finfo?.path !== finfo?.dir) {
entries.unshift({
name: "..",
path: file?.info?.dir,
path: finfo.dir,
isdir: true,
modtime: new Date().getTime(),
mimetype: "directory",
});
}
} catch (e) {
console.error("Directory Read Error", e);
setErrorMsg({
status: "Cannot Read Directory",
text: `${e}`,
Expand Down
1 change: 1 addition & 0 deletions frontend/app/view/preview/previewenv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export type PreviewEnv = WaveEnvSubset<{
ConnEnsureCommand: WaveEnv["rpc"]["ConnEnsureCommand"];
FileInfoCommand: WaveEnv["rpc"]["FileInfoCommand"];
FileReadCommand: WaveEnv["rpc"]["FileReadCommand"];
FileListStreamCommand: WaveEnv["rpc"]["FileListStreamCommand"];
FileWriteCommand: WaveEnv["rpc"]["FileWriteCommand"];
FileMoveCommand: WaveEnv["rpc"]["FileMoveCommand"];
FileDeleteCommand: WaveEnv["rpc"]["FileDeleteCommand"];
Expand Down
10 changes: 0 additions & 10 deletions pkg/remote/fileshare/fsutil/fsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,3 @@ func ReadStreamToFileData(ctx context.Context, readCh <-chan wshrpc.RespOrErrorU
}
return fileData, nil
}

func ReadFileStreamToWriter(ctx context.Context, readCh <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData], writer io.Writer) error {
return ReadFileStream(ctx, readCh, func(finfo wshrpc.FileInfo) {
}, func(entries []*wshrpc.FileInfo) error {
return nil
}, func(data io.Reader) error {
_, err := io.Copy(writer, data)
return err
})
}
50 changes: 41 additions & 9 deletions pkg/remote/fileshare/wshfs/wshfs.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2025, Command Line Inc.
// Copyright 2026, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0

package wshfs
Expand All @@ -7,12 +7,12 @@ import (
"context"
"encoding/base64"
"fmt"
"io"
"log"
"os"
"time"

"github.com/wavetermdev/waveterm/pkg/remote/connparse"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
"github.com/wavetermdev/waveterm/pkg/wshutil"
Expand Down Expand Up @@ -41,22 +41,54 @@ func parseConnection(ctx context.Context, path string) (*connparse.Connection, e
}

func Read(ctx context.Context, data wshrpc.FileData) (*wshrpc.FileData, error) {
if data.Info == nil {
return nil, fmt.Errorf("file info is required")
}
log.Printf("Read: %v", data.Info.Path)
conn, err := parseConnection(ctx, data.Info.Path)
if err != nil {
return nil, err
}
rtnCh := readStream(conn, data)
return fsutil.ReadStreamToFileData(ctx, rtnCh)
}

func readStream(conn *connparse.Connection, data wshrpc.FileData) <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData] {
broker := RpcClient.StreamBroker
if broker == nil {
return nil, fmt.Errorf("stream broker not available")
}
if RpcClientRouteId == "" {
return nil, fmt.Errorf("no route id available")
}
readerRouteId := RpcClientRouteId
writerRouteId := wshutil.MakeConnectionRouteId(conn.Host)
reader, streamMeta := broker.CreateStreamReader(readerRouteId, writerRouteId, 256*1024)
defer reader.Close()
go func() {
<-ctx.Done()
reader.Close()
}()
byteRange := ""
if data.At != nil && data.At.Size > 0 {
byteRange = fmt.Sprintf("%d-%d", data.At.Offset, data.At.Offset+int64(data.At.Size)-1)
}
streamFileData := wshrpc.CommandRemoteStreamFileData{Path: conn.Path, ByteRange: byteRange}
return wshclient.RemoteStreamFileCommand(RpcClient, streamFileData, &wshrpc.RpcOpts{Route: wshutil.MakeConnectionRouteId(conn.Host)})
remoteData := wshrpc.CommandRemoteFileStreamData{
Path: conn.Path,
ByteRange: byteRange,
StreamMeta: *streamMeta,
}
fileInfo, err := wshclient.RemoteFileStreamCommand(RpcClient, remoteData, &wshrpc.RpcOpts{Route: writerRouteId})
if err != nil {
return nil, fmt.Errorf("starting remote file stream: %w", err)
}
var rawData []byte
if fileInfo != nil && !fileInfo.IsDir {
rawData, err = io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("reading file stream: %w", err)
}
}
rtnData := &wshrpc.FileData{Info: fileInfo}
if len(rawData) > 0 {
rtnData.Data64 = base64.StdEncoding.EncodeToString(rawData)
}
return rtnData, nil
}

func GetConnectionRouteId(ctx context.Context, path string) (string, error) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/wshrpc/wshremote/wshremote_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ func (impl *ServerImpl) RemoteListEntriesCommand(ctx context.Context, data wshrp
ch <- wshutil.RespErr[wshrpc.CommandRemoteListEntriesRtnData](err)
return
}
if data.Opts == nil {
data.Opts = &wshrpc.FileListOpts{}
}
innerFilesEntries := []os.DirEntry{}
seen := 0
if data.Opts.Limit == 0 {
Expand Down