Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
张建新 committed Jul 11, 2019
0 parents commit f71e0a1
Show file tree
Hide file tree
Showing 622 changed files with 184,145 additions and 0 deletions.
68 changes: 68 additions & 0 deletions Deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/bin/sh
BASE=$(cd `dirname $0`; pwd)

mkdir -p /root/go


function get_librfkafka(){
yum -y install openssl-devel cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib lz4-devel lz4 gcc-c++
cd $BASE/librdkafka/ && ./configure && make && make install && cd $BASE
export GOPATHVAL=`go env | grep GOPATH | awk -F '=' '{print$2}' | sed s/'"'//g`
}


echo "cd $BASE/../ && cp -r Octopoda $GOPATHVAL/src/github.com/uk0/"

function install_package(){
rm -rf $GOPATHVAL/src/
mkdir -p $GOPATHVAL/src/
mkdir -p $GOPATHVAL/src/github.com/uk0/
cd $BASE/../ && cp -r Octopoda $GOPATHVAL/src/github.com/uk0/
for i in `ls -la $BASE/import/ | grep ^d | awk 'NR>2{print$9}'`;
do
echo "$BASE/import/$i --> $GOPATHVAL/src/$i"
cp -r $BASE/import/$i $GOPATHVAL/src/
done
}

function build_go(){

cd $BASE/ && export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig/ && ./build.sh
}

function chechVersion(){

source /etc/os-release
case $ID in
debian|ubuntu|devuan)
echo "debian|ubuntu|devuan"
sudo apt-get install go
;;
centos|fedora|rhel)
echo "centos|fedora|rhel"
yumdnf="yum"
if test "$(echo "$VERSION_ID >= 22" | bc)" -ne 0; then
yumdnf="dnf"
fi
sudo $yumdnf install -y go
;;
*)
exit 1
;;
esac
}

function delete_all(){
cd $BASE && rm -rf `ls -al | grep -v 'release' | grep -v 'librdkafka' | awk '{print $9}'`
cd $BASE/release && echo "执行 start.sh 启动采集" && ls
}

chechVersion

get_librfkafka

install_package

build_go

#delete_all
100 changes: 100 additions & 0 deletions ESWatcher/EStreamingPusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package main


import (
"context"
"fmt"
"github.com/olivere/elastic"
"time"
)

var host = []string{
"http://47.96.170.1:9200/",
}

var client *elastic.Client

//初始化
func init() {
var err error
client, err = elastic.NewClient(elastic.SetBasicAuth("elastic","aaaa"),elastic.SetURL(host...))
if err != nil {
fmt.Printf("create client failed, err: %v", err)
}
}

//ping 连接测试
func PingNode() {
start := time.Now()

info, code, err := client.Ping(host[0]).Do(context.Background())
if err != nil {
fmt.Printf("ping es failed, err: %v", err)
}

duration := time.Since(start)
fmt.Printf("cost time: %v\n", duration)
fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
}

//校验 index 是否存在
func IndexExists(index ...string) bool {
exists, err := client.IndexExists(index...).Do(context.Background())
if err != nil {
fmt.Printf("%v\n", err)
}
return exists
}

//获取指定 Id 的文档
func GetDoc(index, id string) []byte {
temp := client.Get().Index(index).Id(id)
get, err := temp.Do(context.Background())
if err != nil {
panic(err)
}
if get.Found {
fmt.Printf("Got document %s in version %d from index %s, type %s\n", get.Id, get.Version, get.Index, get.Type)
}
source, err := get.Source.MarshalJSON()
if err != nil {
fmt.Printf("byte convert string failed, err: %v", err)
}
return source
}

//term 查询
func TermQuery(index, type_, fieldName, fieldValue string) *elastic.SearchResult {
query := elastic.NewTermQuery(fieldName, fieldValue)
//_ = elastic.NewQueryStringQuery(fieldValue) //关键字查询

searchResult, err := client.Search().
Index(index).Type(type_).
Query(query).
From(0).Size(10).
Pretty(true).
Do(context.Background())

if err != nil {
panic(err)
}
fmt.Printf("query cost %d millisecond.\n", searchResult.TookInMillis)

return searchResult
}

func Search(index, type_ string) *elastic.SearchResult {
boolQuery := elastic.NewBoolQuery()
searchResult, err := client.Search(index).
Type(type_).Query(boolQuery).Pretty(true).Do(context.Background())
if err != nil {
panic(err)
}

return searchResult
}

func main() {
PingNode()
}

74 changes: 74 additions & 0 deletions HttpPusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"fmt"
"github.com/astaxie/beego/logs"
"github.com/nahid/gohttp"
"os"
)

type SendData struct {
Token string
Msg string
Topic string
}

var (
HttpSender *PushClient
)
var ThreadNum = 4;

type MessageHttp struct {
line string
topic string
}

type PushClient struct {
url string
lineHttpChan chan *MessageHttp
}

func NewHttpPusher(Address string) (afk *PushClient, err error) {
afk = &PushClient{
lineHttpChan: make(chan *MessageHttp, 1024),
url: Address,
}
if err != nil {
fmt.Printf("Failed to create Connetcion: %s\n", err)
os.Exit(1)
}
for i := 0; i < ThreadNum; i++ {
// 根据配置文件循环开启线程去发消息到kafka
go afk.Pusher()
}
return
}

func InitHttpPusher() (err error) {
HttpSender, err = NewHttpPusher("http://kafka.wd.cn/rest/api/msg")
return
}

func (k *PushClient) Pusher() {
//从channel中读取日志内容放到kafka消息队列中
logs.Info("[start pusher]")
req := gohttp.NewRequest()
ch := make(chan *gohttp.AsyncResponse)
for v := range k.lineHttpChan {
var headerVals = map[string]string{}
headerVals["Token"]="bigdata.hive"
headerVals["Topic"]=v.topic
req.Body([]byte(v.line)).Headers(headerVals).AsyncPost(k.url, ch)

<-ch //让go route提前退出

//close(ch)
}


}

func (k *PushClient) addMessage(line string, topic string) (err error) {
k.lineHttpChan <- &MessageHttp{line: line, topic: topic}
return
}
100 changes: 100 additions & 0 deletions KafkaKDC.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package main

import (
"fmt"
"github.com/astaxie/beego/logs"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
)


var (
client *kafka.Producer
kafkaSender *KafkaSenderKDC
)

type MessageKDC struct {
line string
topic string
}

type KafkaSenderKDC struct {
client *kafka.Producer
lineChan chan *MessageKDC
}

// 初始化kafka
func NewKafkaSenderKDC(kafkaAddr string,auth bool)(afk *KafkaSenderKDC,err error){
afk = &KafkaSenderKDC{
lineChan:make(chan *MessageKDC,10000),
}
if auth {
os.Setenv("KRB5_CONFIG","./kr.conf")
client, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": kafkaAddr,
"sasl.kerberos.service.name": "kafka",
"sasl.kerberos.keytab": "./bigdata.hive.keytab",
"sasl.kerberos.principal": "bigdata.hive/admin@we.COM",
"security.protocol": "SASL_PLAINTEXT",
"session.timeout.ms": 1000,
})
}else{
client, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": kafkaAddr,
"session.timeout.ms": 1000,
})
//
}

if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}

fmt.Printf("Created Producer %v\n", client)
afk.client = client
for i:=0;i<appConfig.KafkaThreadNum;i++{
// 根据配置文件循环开启线程去发消息到kafka
go afk.sendToKafka()
}
return
}

func InitKafkaKDC()(err error){
kafkaSender,err = NewKafkaSenderKDC(appConfig.kafkaAddr,true)
return
}

func (k *KafkaSenderKDC) sendToKafka(){
//从channel中读取日志内容放到kafka消息队列中
for v := range k.lineChan{
deliveryChan := make(chan kafka.Event)
ip,_:=getLocalIP()
err := client.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &v.topic, Partition: kafka.PartitionAny},
Value: []byte(v.line),
Headers: []kafka.Header{{Key: ip[0], Value: []byte("service running on host")}},
}, deliveryChan)

e := <-deliveryChan
m := e.(*kafka.Message)
if err != nil{
logs.Error("send message to kafka failed,err:%v",err)
}
if m.TopicPartition.Error != nil {
logs.Error("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
logs.Info("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
close(deliveryChan)
}

}

func (k *KafkaSenderKDC) addMessage(line string,topic string)(err error){
//我们通过tailf读取的日志文件内容先放到channel里面
logs.Info("Kafka Add Message")
k.lineChan <- &MessageKDC{line:line,topic:topic}
return
}
Binary file added Octopoda.graffle
Binary file not shown.
Loading

0 comments on commit f71e0a1

Please sign in to comment.