这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ bin/portal-tunnel expose --config config.yaml
2. **Expose a single service directly**

```bash
bin/portal-tunnel expose --relay <url> --host localhost --port 8080 --name <service>
bin/portal-tunnel expose --relay <url> [--relay <url> ...] --host localhost --port 8080 --name <service>
```

## Glossary
Expand Down
13 changes: 9 additions & 4 deletions cmd/portal-tunnel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"gopkg.in/yaml.v3"
)

var defaultProtocols = []string{"http/1.1", "h2"}

// RelayConfig describes a named relay endpoint and its bootstrap URLs.
type RelayConfig struct {
Name string `yaml:"name"`
Expand Down Expand Up @@ -171,10 +173,13 @@ func (cfg *TunnelConfig) validate() error {
}

func (cfg *TunnelConfig) applyDefaults() {
const defaultProtocol = "http/1.1"
for i := range cfg.Services {
if len(cfg.Services[i].Protocols) == 0 {
cfg.Services[i].Protocols = []string{defaultProtocol}
}
applyServiceDefaults(&cfg.Services[i])
}
}

func applyServiceDefaults(svc *ServiceConfig) {
if len(svc.Protocols) == 0 {
svc.Protocols = append([]string(nil), defaultProtocols...)
}
}
168 changes: 39 additions & 129 deletions cmd/portal-tunnel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,19 @@ import (
"strings"
"sync"
"syscall"
"time"

"github.com/rs/zerolog/log"
"gosuda.org/portal/sdk"
)

var (
flagConfigPath string
flagService string
flagRelayURL string
flagRelayURLs string
flagHost string
flagPort string
flagName string
)

type serviceContext struct {
Name string
LocalAddr string
RelayServers []string
}

func main() {
if len(os.Args) < 2 {
printTunnelUsage()
Expand All @@ -42,8 +34,7 @@ func main() {
case "expose":
fs := flag.NewFlagSet("expose", flag.ExitOnError)
fs.StringVar(&flagConfigPath, "config", "", "Path to portal-tunnel config file")
fs.StringVar(&flagService, "service", "", "Specific service name to expose (defaults to first entry)")
fs.StringVar(&flagRelayURL, "relay", "ws://localhost:4017/relay", "Portal relay server URL when config is not provided")
fs.StringVar(&flagRelayURLs, "relay", "ws://localhost:4017/relay", "Portal relay server URLs when config is not provided (comma-separated)")
fs.StringVar(&flagHost, "host", "localhost", "Local host to proxy to when config is not provided")
fs.StringVar(&flagPort, "port", "4018", "Local port to proxy to when config is not provided")
fs.StringVar(&flagName, "name", "", "Service name when config is not provided (auto-generated if empty)")
Expand All @@ -65,8 +56,8 @@ func printTunnelUsage() {
fmt.Println("portal-tunnel — Expose local services through Portal relay")
fmt.Println()
fmt.Println("Usage:")
fmt.Println(" portal-tunnel expose --config <file> [--service <name>]")
fmt.Println(" portal-tunnel expose [--relay URL] [--host HOST] [--port PORT] [--name NAME]")
fmt.Println(" portal-tunnel expose --config <file>")
fmt.Println(" portal-tunnel expose [--relay URL1,URL2] [--host HOST] [--port PORT] [--name NAME]")
}

func runExpose() error {
Expand All @@ -81,11 +72,6 @@ func runExposeWithConfig() error {
if err != nil {
return fmt.Errorf("load config: %w", err)
}
services, err := selectServices(cfg, flagService)
if err != nil {
return err
}

relayDir := NewRelayDirectory(cfg.Relays)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -100,11 +86,11 @@ func runExposeWithConfig() error {
cancel()
}()

errCh := make(chan error, len(services))
errCh := make(chan error, len(cfg.Services))
var wg sync.WaitGroup

for _, svc := range services {
service := svc
for i := range cfg.Services {
service := &cfg.Services[i]
wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -135,32 +121,23 @@ func runExposeWithConfig() error {
}

func runExposeWithFlags() error {
relayURL := strings.TrimSpace(flagRelayURL)
if relayURL == "" {
return fmt.Errorf("--relay is required when --config is not provided")
relayURLs := parseCommaSeparatedURLs(flagRelayURLs)
if len(relayURLs) == 0 {
return fmt.Errorf("--relay must include at least one non-empty URL when --config is not provided")
}

host := strings.TrimSpace(flagHost)
if host == "" {
host = "localhost"
}
port := strings.TrimSpace(flagPort)
if port == "" {
return fmt.Errorf("--port is required when --config is not provided")
}

target := net.JoinHostPort(host, port)
target := net.JoinHostPort(flagHost, flagPort)
service := &ServiceConfig{
Name: strings.TrimSpace(flagName),
Target: target,
Protocols: []string{"http/1.1", "h2"},
RelayPreference: []string{"flags"},
}
applyServiceDefaults(service)

relayDir := NewRelayDirectory([]RelayConfig{
{
Name: "flags",
URLs: []string{relayURL},
URLs: relayURLs,
},
})

Expand All @@ -184,49 +161,39 @@ func runExposeWithFlags() error {
return nil
}

func proxyConnection(ctx context.Context, svcCtx *serviceContext, relayConn net.Conn, connNum int) error {
func proxyConnection(ctx context.Context, localAddr string, relayConn net.Conn) error {
defer relayConn.Close()

// Connect to local service
localConn, err := net.Dial("tcp", svcCtx.LocalAddr)
localConn, err := net.Dial("tcp", localAddr)
if err != nil {
return fmt.Errorf("failed to connect to local service %s: %w", svcCtx.LocalAddr, err)
return fmt.Errorf("failed to connect to local service %s: %w", localAddr, err)
}
defer localConn.Close()

// Bidirectional copy
errCh := make(chan error, 2)
cancelCopy := make(chan struct{})
stopCh := make(chan struct{})
go func() {
select {
case <-ctx.Done():
relayConn.Close()
localConn.Close()
case <-cancelCopy:
case <-stopCh:
}
}()

// Relay -> Local
go func() {
_, err := io.Copy(localConn, relayConn)
errCh <- err
}()

// Local -> Relay
go func() {
_, err := io.Copy(relayConn, localConn)
errCh <- err
}()

// Wait for one direction to finish
err = <-errCh

// Close both connections to stop the other goroutine
close(stopCh)
relayConn.Close()
localConn.Close()
close(cancelCopy)

// Wait for other goroutine
<-errCh

return err
Expand All @@ -246,18 +213,7 @@ func runServiceTunnel(ctx context.Context, relayDir *RelayDirectory, service *Se
serviceName = fmt.Sprintf("tunnel-%s", leaseID[:8])
log.Info().Str("service", serviceName).Msg("No service name provided; generated automatically")
}
svcCtx := &serviceContext{
Name: serviceName,
LocalAddr: localAddr,
RelayServers: bootstrapServers,
}

log.Info().Str("service", serviceName).Msgf("Waiting for local service at %s (interval=%v)...", localAddr, time.Second)
if err := waitForLocalService(localAddr, 0, time.Second); err != nil {
return fmt.Errorf("service %s: %w", serviceName, err)
}
log.Info().Str("service", serviceName).Msgf("✓ Local service is reachable at %s", localAddr)

log.Info().Str("service", serviceName).Msgf("Local service is reachable at %s", localAddr)
log.Info().Str("service", serviceName).Msgf("Starting Portal Tunnel (%s)...", origin)
log.Info().Str("service", serviceName).Msgf(" Local: %s", localAddr)
log.Info().Str("service", serviceName).Msgf(" Relays: %s", strings.Join(bootstrapServers, ", "))
Expand All @@ -283,12 +239,11 @@ func runServiceTunnel(ctx context.Context, relayDir *RelayDirectory, service *Se
}()

log.Info().Str("service", serviceName).Msg("")
log.Info().Str("service", serviceName).Msg("=== Service is now publicly accessible ===")
log.Info().Str("service", serviceName).Msg("Access via:")
log.Info().Str("service", serviceName).Msgf("- Name: /peer/%s", serviceName)
log.Info().Str("service", serviceName).Msgf("- Lease ID: /peer/%s", leaseID)
relayHost := extractHost(bootstrapServers[0])
log.Info().Str("service", serviceName).Msgf("- Example: http://%s/peer/%s", relayHost, serviceName)
log.Info().Str("service", serviceName).Msgf("- Example: http://%s/peer/%s", bootstrapServers[0], serviceName)

log.Info().Str("service", serviceName).Msg("")

connCount := 0
Expand All @@ -313,79 +268,34 @@ func runServiceTunnel(ctx context.Context, relayDir *RelayDirectory, service *Se
}

connCount++
currentConnCount := connCount
log.Info().Str("service", serviceName).Msgf("→ [#%d] New connection from %s", currentConnCount, relayConn.RemoteAddr())
log.Info().Str("service", serviceName).Msgf("→ [#%d] New connection from %s", connCount, relayConn.RemoteAddr())

connWG.Add(1)
go func(relayConn net.Conn, connNum int) {
go func(relayConn net.Conn) {
defer connWG.Done()
if err := proxyConnection(ctx, svcCtx, relayConn, connNum); err != nil {
log.Error().Str("service", serviceName).Err(err).Int("conn", connNum).Msg("Proxy error")
if err := proxyConnection(ctx, localAddr, relayConn); err != nil {
log.Error().Str("service", serviceName).Err(err).Msg("Proxy error")
}
log.Info().Str("service", serviceName).Msgf("← [#%d] Connection closed", connNum)
}(relayConn, currentConnCount)
log.Info().Str("service", serviceName).Msg("Connection closed")
}(relayConn)
}
}

func selectServices(cfg *TunnelConfig, name string) ([]*ServiceConfig, error) {
if len(cfg.Services) == 0 {
return nil, fmt.Errorf("config has no services")
}
if name == "" {
services := make([]*ServiceConfig, len(cfg.Services))
for i := range cfg.Services {
services[i] = &cfg.Services[i]
}
return services, nil
}
for i := range cfg.Services {
if cfg.Services[i].Name == name {
return []*ServiceConfig{&cfg.Services[i]}, nil
}
func parseCommaSeparatedURLs(raw string) []string {
raw = strings.TrimSpace(raw)
if raw == "" {
return nil
}
return nil, fmt.Errorf("service %q not found in config", name)
}

func extractHost(wsURL string) string {
// Simple extraction: ws://host:port/path -> host:port
// Remove ws:// or wss://
host := wsURL
if len(host) > 5 && host[:5] == "ws://" {
host = host[5:]
} else if len(host) > 6 && host[:6] == "wss://" {
host = host[6:]
}
parts := strings.Split(raw, ",")
out := make([]string, 0, len(parts))

// Remove path
if idx := len(host); idx > 0 {
for i, c := range host {
if c == '/' {
idx = i
break
}
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
out = append(out, p)
}
host = host[:idx]
}

return host
}

// waitForLocalService tries to connect repeatedly until success or timeout.
// If timeout == 0, it waits indefinitely.
func waitForLocalService(localAddr string, timeout, interval time.Duration) error {
deadline := time.Time{}
if timeout > 0 {
deadline = time.Now().Add(timeout)
}
for {
conn, err := net.DialTimeout("tcp", localAddr, 2*time.Second)
if err == nil {
conn.Close()
return nil
}
if !deadline.IsZero() && time.Now().After(deadline) {
return fmt.Errorf("timeout waiting for local service at %s: %w", localAddr, err)
}
time.Sleep(interval)
}
return out
}
Loading