Buffer RTSP performance
This commit is contained in:
186
src/buffer.go
186
src/buffer.go
@@ -473,6 +473,7 @@ func killClientConnection(streamID int, playlistID string, force bool) {
|
||||
if clients.Connection <= 0 {
|
||||
BufferClients.Delete(playlistID + stream.MD5)
|
||||
delete(playlist.Streams, streamID)
|
||||
delete(playlist.Clients, streamID)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1344,9 +1345,11 @@ func bufferWithFfmpeg(streamID int, playlistID string) {
|
||||
var playlist = p.(Playlist)
|
||||
var debug, path, options, bufferType string
|
||||
var tmpSegment = 1
|
||||
var fileSize = 0
|
||||
var bufferSize = Settings.BufferSize * 1024
|
||||
var stream = playlist.Streams[streamID]
|
||||
var buf bytes.Buffer
|
||||
var fileSize = 0
|
||||
var streamStatus = make(chan bool)
|
||||
|
||||
var tmpFolder = playlist.Streams[streamID].Folder
|
||||
var url = playlist.Streams[streamID].URL
|
||||
@@ -1392,25 +1395,13 @@ func bufferWithFfmpeg(streamID int, playlistID string) {
|
||||
|
||||
showInfo("Streaming URL:" + stream.URL)
|
||||
|
||||
// Größe des Buffers
|
||||
buffer := make([]byte, 1024*Settings.BufferSize*2)
|
||||
var tmpFileSize = 1024 * Settings.BufferSize * 1
|
||||
|
||||
debug = fmt.Sprintf("Buffer Size:%d KB [SERVER CONNECTION]", len(buffer)/1024)
|
||||
showDebug(debug, 3)
|
||||
|
||||
debug = fmt.Sprintf("Buffer Size:%d KB [CLIENT CONNECTION]", tmpFileSize/1024)
|
||||
showDebug(debug, 3)
|
||||
|
||||
var tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
|
||||
|
||||
bufferFile, err := os.Create(tmpFile)
|
||||
f, err := os.Create(tmpFile)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
|
||||
addErrorToStream(err)
|
||||
bufferFile.Close()
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
var args = strings.Replace(options, "[URL]", url, -1)
|
||||
@@ -1447,130 +1438,147 @@ func bufferWithFfmpeg(streamID int, playlistID string) {
|
||||
|
||||
go func() {
|
||||
|
||||
// Log Daten vom Prozess im Dubug Mode 2 anzeigen.
|
||||
// Log Daten vom Prozess im Dubug Mode 1 anzeigen.
|
||||
scanner := bufio.NewScanner(logOut)
|
||||
scanner.Split(bufio.ScanLines)
|
||||
|
||||
for scanner.Scan() {
|
||||
|
||||
debug = fmt.Sprintf("%s:%s", bufferType, strings.TrimSpace(scanner.Text()))
|
||||
showDebug(debug, 1)
|
||||
|
||||
select {
|
||||
case <-streamStatus:
|
||||
showDebug(debug, 1)
|
||||
default:
|
||||
showInfo(debug)
|
||||
}
|
||||
|
||||
time.Sleep(time.Duration(10) * time.Millisecond)
|
||||
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
scanner := bufio.NewScanner(stdOut)
|
||||
scanner.Split(bufio.ScanBytes)
|
||||
f, err = os.OpenFile(tmpFile, os.O_APPEND|os.O_WRONLY, 0600)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
for scanner.Scan() {
|
||||
buffer := make([]byte, 1024*4)
|
||||
|
||||
if len(buf.Bytes()) == 0 && stream.Status == false {
|
||||
showInfo("Streaming Status:Receive data from " + bufferType)
|
||||
}
|
||||
reader := bufio.NewReader(stdOut)
|
||||
|
||||
b := scanner.Bytes()
|
||||
t := make(chan int)
|
||||
|
||||
for _, i := range b {
|
||||
buf.WriteByte(i)
|
||||
}
|
||||
go func() {
|
||||
|
||||
fileSize = fileSize + len(b)
|
||||
var timeout = 0
|
||||
for {
|
||||
time.Sleep(time.Duration(100) * time.Millisecond)
|
||||
timeout++
|
||||
|
||||
if clientConnection(stream) == false {
|
||||
|
||||
cmd.Process.Kill()
|
||||
bufferFile.Close()
|
||||
|
||||
err = os.RemoveAll(stream.Folder)
|
||||
if err != nil {
|
||||
ShowError(err, 4005)
|
||||
select {
|
||||
case <-t:
|
||||
return
|
||||
default:
|
||||
t <- timeout
|
||||
}
|
||||
|
||||
cmd.Wait()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if fileSize >= tmpFileSize/2 {
|
||||
}()
|
||||
|
||||
if stream.Status == false {
|
||||
showInfo("Streaming Status:Buffering data from " + bufferType)
|
||||
}
|
||||
|
||||
if _, err := bufferFile.Write(buf.Bytes()); err != nil {
|
||||
for {
|
||||
|
||||
select {
|
||||
case timeout := <-t:
|
||||
if timeout >= 20 && tmpSegment == 1 {
|
||||
cmd.Process.Kill()
|
||||
ShowError(err, 0)
|
||||
err = errors.New("Timout")
|
||||
ShowError(err, 4006)
|
||||
addErrorToStream(err)
|
||||
cmd.Wait()
|
||||
|
||||
f.Close()
|
||||
return
|
||||
}
|
||||
|
||||
debug = fmt.Sprintf("Buffer Status:Done (%s)", tmpFile)
|
||||
showDebug(debug, 2)
|
||||
default:
|
||||
|
||||
bufferFile.Close()
|
||||
}
|
||||
|
||||
if fileSize == 0 && stream.Status == false {
|
||||
showInfo("Streaming Status:Receive data from " + bufferType)
|
||||
}
|
||||
|
||||
if clientConnection(stream) == false {
|
||||
cmd.Process.Kill()
|
||||
f.Close()
|
||||
cmd.Wait()
|
||||
return
|
||||
}
|
||||
|
||||
n, err := reader.Read(buffer)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
fileSize = fileSize + len(buffer[:n])
|
||||
|
||||
if _, err := f.Write(buffer[:n]); err != nil {
|
||||
cmd.Process.Kill()
|
||||
ShowError(err, 0)
|
||||
addErrorToStream(err)
|
||||
cmd.Wait()
|
||||
f.Close()
|
||||
return
|
||||
}
|
||||
|
||||
if fileSize >= bufferSize/2 {
|
||||
|
||||
if tmpSegment == 1 && stream.Status == false {
|
||||
close(t)
|
||||
close(streamStatus)
|
||||
showInfo(fmt.Sprintf("Streaming Status:Buffering data from %s", bufferType))
|
||||
}
|
||||
|
||||
f.Close()
|
||||
tmpSegment++
|
||||
|
||||
stream.Status = true
|
||||
playlist.Streams[streamID] = stream
|
||||
BufferInformation.Store(playlistID, playlist)
|
||||
|
||||
tmpSegment++
|
||||
|
||||
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
|
||||
|
||||
if clientConnection(stream) == false {
|
||||
|
||||
cmd.Process.Kill()
|
||||
bufferFile.Close()
|
||||
|
||||
err = os.RemoveAll(stream.Folder)
|
||||
if err != nil {
|
||||
ShowError(err, 4005)
|
||||
}
|
||||
|
||||
cmd.Wait()
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
bufferFile, err = os.Create(tmpFile)
|
||||
if err != nil {
|
||||
|
||||
addErrorToStream(err)
|
||||
cmd.Process.Kill()
|
||||
cmd.Wait()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
|
||||
fileSize = 0
|
||||
|
||||
/*
|
||||
if n == 0 {
|
||||
bufferFile.Close()
|
||||
run.Process.Kill()
|
||||
break
|
||||
}
|
||||
*/
|
||||
var errCreate, errOpen error
|
||||
f, errCreate = os.Create(tmpFile)
|
||||
f, errOpen = os.OpenFile(tmpFile, os.O_APPEND|os.O_WRONLY, 0600)
|
||||
if errCreate != nil || errOpen != nil {
|
||||
cmd.Process.Kill()
|
||||
ShowError(err, 0)
|
||||
addErrorToStream(err)
|
||||
cmd.Wait()
|
||||
f.Close()
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//cmd.Process.Kill()
|
||||
cmd.Process.Kill()
|
||||
cmd.Wait()
|
||||
|
||||
err = errors.New(bufferType + " error")
|
||||
addErrorToStream(err)
|
||||
ShowError(err, 1204)
|
||||
|
||||
time.Sleep(time.Duration(1000) * time.Millisecond)
|
||||
time.Sleep(time.Duration(500) * time.Millisecond)
|
||||
clientConnection(stream)
|
||||
|
||||
//os.Exit(0)
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user