Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enroll failed on docker with diffent user process #11

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion generated_metrics_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"infini.sh/framework/core/keystore"
"infini.sh/framework/core/kv"
"infini.sh/framework/core/model"
util2 "infini.sh/framework/core/util"
"infini.sh/framework/lib/go-ucfg"
"infini.sh/framework/modules/configs/config"
"os"
Expand Down Expand Up @@ -42,7 +43,7 @@ func generatedMetricsTasksConfig() error {
if port == "" {
port = "9200" //k8s easysearch port is always 9200
}
endpoint := fmt.Sprintf("%s://127.0.0.1:%s", schema, port)
endpoint := fmt.Sprintf("%s://%s:%s", util2.LocalAddress, schema, port)
v, err := keystore.GetValue("agent_user")
if err != nil {
return fmt.Errorf("get agent_user error: %w", err)
Expand Down
197 changes: 174 additions & 23 deletions lib/process/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ package process

import (
"fmt"
log "github.com/cihub/seelog"
"github.com/shirou/gopsutil/v3/process"
"infini.sh/framework/core/global"
"infini.sh/framework/core/model"
"infini.sh/framework/core/util"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
)

type FilterFunc func(cmdline string) bool
Expand All @@ -35,31 +40,18 @@ func DiscoverESProcessors(filter FilterFunc) (map[int]model.ProcessInfo, error)
}
if filter(cmdline) {
processName, _ := p.Name()
//k8s easysearch process pid is 1
if p.Pid == 1 {
envPort := os.Getenv("http.port")
port, _ := strconv.Atoi(envPort)
processInfo := model.ProcessInfo{
PID: int(p.Pid),
Name: processName,
Cmdline: cmdline,
ListenAddresses: []model.ListenAddr{
{
IP: util.GetLocalIPs()[0],
Port: port,
},
},
Status: "N/A",
// Handle K8S container when easysearch specific process (pid 1)
allowGenernated := global.Env().SystemConfig.Configs.AllowGeneratedMetricsTasks
if p.Pid == 1 && allowGenernated {
processInfo, err := handleK8sProcess(p, processName, cmdline)
if err != nil {
log.Errorf("Error handling k8s process: %v", err)
} else {
resultProcesses[processInfo.PID] = processInfo
break
}
status, _ := p.Status()
if len(status) > 0 {
processInfo.Status = status[0]
}
processInfo.CreateTime, _ = p.CreateTime()

resultProcesses[processInfo.PID] = processInfo
break
}

connections, err := p.Connections()
if err != nil {
return nil, fmt.Errorf("get process connections error: %w", err)
Expand All @@ -74,6 +66,13 @@ func DiscoverESProcessors(filter FilterFunc) (map[int]model.ProcessInfo, error)
})
}
}

// If no listen addresses found, try to read from /proc/net/tcp and /proc/net/tcp6 files on linux
if len(addresses) == 0 && runtime.GOOS == "linux" {
addresses = readProcTcpServicePorts(p)
addresses = append(addresses)
}

if len(addresses) > 0 {
processInfo := model.ProcessInfo{
PID: int(p.Pid),
Expand All @@ -94,3 +93,155 @@ func DiscoverESProcessors(filter FilterFunc) (map[int]model.ProcessInfo, error)
}
return resultProcesses, nil
}

// handleK8sProcess handles the specific case of processes with pid 1 in kubernetes environment
func handleK8sProcess(p *process.Process, processName string, cmdline string) (model.ProcessInfo, error) {
envPort := os.Getenv("http.port")
port, err := strconv.Atoi(envPort)
if err != nil {
return model.ProcessInfo{}, fmt.Errorf("get env http.port error: %w", err)
}

processInfo := model.ProcessInfo{
PID: int(p.Pid),
Name: processName,
Cmdline: cmdline,
ListenAddresses: []model.ListenAddr{
{
IP: util.GetLocalIPs()[0],
Port: port,
},
},
Status: "N/A",
}
status, _ := p.Status()
if len(status) > 0 {
processInfo.Status = status[0]
}
processInfo.CreateTime, _ = p.CreateTime()
return processInfo, nil
}

// readProcNetFile reads the /proc/net/tcp and /proc/net/tcp6 files and returns the listening addresses
func readProcTcpServicePorts(p *process.Process) []model.ListenAddr {
var listenAddrs []model.ListenAddr
pid := p.Pid
tcpFile := filepath.Join("/proc", strconv.Itoa(int(pid)), "net/tcp")
if _, err := os.Stat(tcpFile); os.IsNotExist(err) {
log.Errorf("skip: tcp file does not exists: %s", tcpFile)
} else if err == nil {
tcpPorts, err := readProcNetFile(tcpFile, p)
if err != nil {
log.Errorf("Error reading tcp file: %s, error: %v", tcpFile, err)
} else {
listenAddrs = append(listenAddrs, tcpPorts...)
}
}

tcp6File := filepath.Join("/proc", strconv.Itoa(int(pid)), "net/tcp6")
if _, err := os.Stat(tcp6File); os.IsNotExist(err) {
log.Errorf("skip: tcp6 file does not exists: %s", tcp6File)
} else if err == nil {
tcp6Ports, err := readProcNetFile(tcp6File, p)
if err != nil {
log.Errorf("Error reading tcp6 file: %s, error: %v", tcp6File, err)
} else {
listenAddrs = append(listenAddrs, tcp6Ports...)
}
}
return listenAddrs
}

func readProcNetFile(file string, p *process.Process) ([]model.ListenAddr, error) {
var listenAddrs []model.ListenAddr
b, err := os.ReadFile(file)
if err != nil {
return listenAddrs, fmt.Errorf("open file error: %w", err)
}

for _, line := range strings.Split(strings.TrimSpace(string(b)), "\n") {
// The format contains whitespace padding (%4d, %5u), so we use
// fmt.Sscanf instead of splitting on whitespace.
var (
sl int
readLocalAddr, readRemoteAddr string
state int
queue, timer string
retransmit int
remoteUID uint
)
// Note that we must use %d where the kernel format uses %4d or %5u:
// - %4d fails to parse for large number of entries (len(sl) > 4)
// - %u is not understood by the fmt package (%U is something else)
// - %5d cuts off longer uids (e.g. 149098 on gLinux)
n, err := fmt.Sscanf(line, "%d: %s %s %02X %s %s %08X %d",
&sl, &readLocalAddr, &readRemoteAddr, &state, &queue, &timer, &retransmit, &remoteUID)
if n != 8 || err != nil {
continue // invalid line (e.g. header line)
}
if state != 0x0A && state != 0x06 { // Only keep listening status, 0A: TCP_LISTEN, 06: TCP6_LISTEN
continue
}
// Parse local address and port with same uid as the process
uids, _ := p.Uids()
if len(uids) > 0 {
uid := int(uids[0])
same := uid == int(remoteUID)
if !same {
continue
}
parseLine(&listenAddrs, readLocalAddr)
}
}
return listenAddrs, nil
}

func parseLine(listenAddr *[]model.ListenAddr, addr string) {
// Parse address and port
addrParts := strings.Split(addr, ":")
if len(addrParts) != 2 {
return
}
hexIP := addrParts[0]
hexPort := addrParts[1]
// Handle IPv6 address
if len(hexIP) > 24 {
hexIP = hexIP[24:]
}
localIP, _ := hexToDecimal(hexIP)
ip := ipv4Ntoa(uint32(localIP))
port, _ := hexToDecimal(hexPort)
// IPv6 address is not supported
if ip == util.LocalIpv6Address {
ip = util.LocalAddress
}

for _, existingAddr := range *listenAddr {
if existingAddr.IP == ip && existingAddr.Port == port {
return
}
}
*listenAddr = append(*listenAddr, model.ListenAddr{
IP: ip,
Port: port,
})
}

// ipv4Ntoa converts uint32 to IPv4 string representation
func ipv4Ntoa(ip uint32) string {
return fmt.Sprintf("%d.%d.%d.%d",
ip>>0&0xFF,
ip>>8&0xFF,
ip>>16&0xFF,
ip>>24&0xFF,
)
}

// hexToDecimal converts hex string to decimal int
func hexToDecimal(hex string) (int, error) {
value, err := strconv.ParseUint(hex, 16, 64)
if err != nil {
return 0, err
}
return int(value), nil
}
2 changes: 1 addition & 1 deletion plugin/elastic/esinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func getPortByPid(pid string) []int {
localAddr := conn.Laddr.IP
localPort := conn.Laddr.Port

if localAddr == "0.0.0.0" {
if localAddr == util.ReservedAddress {
// If listening on 0.0.0.0, it's listening on all interfaces, so include port
listeningPorts = append(listeningPorts, int(localPort))
} else {
Expand Down
Loading