Merge branch 'beta'
This commit is contained in:
299
src/buffer.go
299
src/buffer.go
@@ -6,6 +6,8 @@ package src
|
||||
*/
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@@ -13,6 +15,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
@@ -205,7 +208,17 @@ func bufferingStream(playlistID, streamingURL, channelName string, w http.Respon
|
||||
playlist.Streams[streamID] = stream
|
||||
BufferInformation.Store(playlistID, playlist)
|
||||
|
||||
go connectToStreamingServer(streamID, playlistID)
|
||||
switch Settings.Buffer {
|
||||
|
||||
case "xteve":
|
||||
go connectToStreamingServer(streamID, playlistID)
|
||||
case "ffmpeg", "vlc":
|
||||
go thirdPartyBuffer(streamID, playlistID)
|
||||
|
||||
default:
|
||||
break
|
||||
|
||||
}
|
||||
|
||||
showInfo(fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner))
|
||||
|
||||
@@ -400,7 +413,7 @@ func getTmpFiles(stream *ThisStream) (tmpFiles []string) {
|
||||
return
|
||||
}
|
||||
|
||||
if len(files) > 1 {
|
||||
if len(files) > 2 {
|
||||
|
||||
for _, file := range files {
|
||||
|
||||
@@ -460,6 +473,7 @@ func killClientConnection(streamID int, playlistID string, force bool) {
|
||||
if clients.Connection <= 0 {
|
||||
BufferClients.Delete(playlistID + stream.MD5)
|
||||
delete(playlist.Streams, streamID)
|
||||
delete(playlist.Clients, streamID)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -526,6 +540,9 @@ func connectToStreamingServer(streamID int, playlistID string) {
|
||||
var m3u8Segments []string
|
||||
var bandwidth BandwidthCalculation
|
||||
var networkBandwidth = Settings.M3U8AdaptiveBandwidthMBPS * 1e+6
|
||||
// Größe des Buffers
|
||||
var bufferSize = Settings.BufferSize
|
||||
var buffer = make([]byte, 1024*bufferSize*2)
|
||||
|
||||
var defaultSegment = func() {
|
||||
|
||||
@@ -805,8 +822,8 @@ func connectToStreamingServer(streamID int, playlistID string) {
|
||||
var fileSize int
|
||||
|
||||
// Größe des Buffers
|
||||
buffer := make([]byte, 1024*Settings.BufferSize*2)
|
||||
var tmpFileSize = 1024 * Settings.BufferSize * 1
|
||||
buffer = make([]byte, 1024*bufferSize*2)
|
||||
var tmpFileSize = 1024 * bufferSize * 1
|
||||
|
||||
debug = fmt.Sprintf("Buffer Size:%d KB [SERVER CONNECTION]", len(buffer)/1024)
|
||||
showDebug(debug, 3)
|
||||
@@ -882,7 +899,7 @@ func connectToStreamingServer(streamID int, playlistID string) {
|
||||
}
|
||||
|
||||
// Buffer auf die Festplatte speichern
|
||||
if fileSize >= tmpFileSize || n == 0 {
|
||||
if fileSize >= tmpFileSize/2 || n == 0 {
|
||||
|
||||
bandwidth.Stop = time.Now()
|
||||
bandwidth.Size += fileSize
|
||||
@@ -928,6 +945,7 @@ func connectToStreamingServer(streamID int, playlistID string) {
|
||||
}
|
||||
|
||||
fileSize = 0
|
||||
buffer = make([]byte, 1024*bufferSize*2)
|
||||
|
||||
if n == 0 {
|
||||
bufferFile.Close()
|
||||
@@ -1255,12 +1273,6 @@ func parseM3U8(stream *ThisStream) (err error) {
|
||||
break
|
||||
}
|
||||
|
||||
err := checkFile(stream.Folder + "remove")
|
||||
if err == nil {
|
||||
os.RemoveAll(stream.Folder)
|
||||
break
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1319,14 +1331,275 @@ func switchBandwidth(stream *ThisStream) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Buffer mit FFMPEG
|
||||
func thirdPartyBuffer(streamID int, playlistID string) {
|
||||
|
||||
if p, ok := BufferInformation.Load(playlistID); ok {
|
||||
|
||||
var playlist = p.(Playlist)
|
||||
var debug, path, options, bufferType string
|
||||
var tmpSegment = 1
|
||||
var bufferSize = Settings.BufferSize * 1024
|
||||
var stream = playlist.Streams[streamID]
|
||||
var buf bytes.Buffer
|
||||
var fileSize = 0
|
||||
var streamStatus = make(chan bool)
|
||||
|
||||
var tmpFolder = playlist.Streams[streamID].Folder
|
||||
var url = playlist.Streams[streamID].URL
|
||||
|
||||
stream.Status = false
|
||||
|
||||
bufferType = strings.ToUpper(Settings.Buffer)
|
||||
|
||||
switch Settings.Buffer {
|
||||
|
||||
case "ffmpeg":
|
||||
path = Settings.FFmpegPath
|
||||
options = Settings.FFmpegOptions
|
||||
|
||||
case "vlc":
|
||||
path = Settings.VLCPath
|
||||
options = Settings.VLCOptions
|
||||
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
var addErrorToStream = func(err error) {
|
||||
|
||||
var stream = playlist.Streams[streamID]
|
||||
|
||||
if c, ok := BufferClients.Load(playlistID + stream.MD5); ok {
|
||||
|
||||
var clients = c.(ClientConnection)
|
||||
clients.Error = err
|
||||
BufferClients.Store(playlistID+stream.MD5, clients)
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
os.RemoveAll(getPlatformPath(tmpFolder))
|
||||
|
||||
err := checkFolder(tmpFolder)
|
||||
if err != nil {
|
||||
ShowError(err, 0)
|
||||
addErrorToStream(err)
|
||||
return
|
||||
}
|
||||
|
||||
err = checkFile(path)
|
||||
if err != nil {
|
||||
ShowError(err, 0)
|
||||
addErrorToStream(err)
|
||||
return
|
||||
}
|
||||
|
||||
showInfo(fmt.Sprintf("%s path:%s", bufferType, path))
|
||||
showInfo("Streaming URL:" + stream.URL)
|
||||
|
||||
var tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
|
||||
|
||||
f, err := os.Create(tmpFile)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
addErrorToStream(err)
|
||||
return
|
||||
}
|
||||
|
||||
var args = strings.Replace(options, "[URL]", url, -1)
|
||||
var cmd = exec.Command(path, strings.Split(args, " ")...)
|
||||
|
||||
debug = fmt.Sprintf("%s:%s %s", bufferType, path, args)
|
||||
showDebug(debug, 1)
|
||||
|
||||
// Byte-Daten vom Prozess
|
||||
stdOut, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
ShowError(err, 0)
|
||||
cmd.Process.Kill()
|
||||
cmd.Wait()
|
||||
addErrorToStream(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Log-Daten vom Prozess
|
||||
logOut, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
ShowError(err, 0)
|
||||
cmd.Process.Kill()
|
||||
cmd.Wait()
|
||||
addErrorToStream(err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(buf.Bytes()) == 0 && stream.Status == false {
|
||||
showInfo(bufferType + ":Processing data")
|
||||
}
|
||||
|
||||
cmd.Start()
|
||||
defer cmd.Wait()
|
||||
|
||||
go func() {
|
||||
|
||||
// Log Daten vom Prozess im Dubug Mode 1 anzeigen.
|
||||
scanner := bufio.NewScanner(logOut)
|
||||
scanner.Split(bufio.ScanLines)
|
||||
|
||||
for scanner.Scan() {
|
||||
|
||||
debug = fmt.Sprintf("%s log:%s", bufferType, strings.TrimSpace(scanner.Text()))
|
||||
|
||||
select {
|
||||
case <-streamStatus:
|
||||
showDebug(debug, 1)
|
||||
default:
|
||||
showInfo(debug)
|
||||
}
|
||||
|
||||
time.Sleep(time.Duration(10) * time.Millisecond)
|
||||
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
f, err = os.OpenFile(tmpFile, os.O_APPEND|os.O_WRONLY, 0600)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
buffer := make([]byte, 1024*4)
|
||||
|
||||
reader := bufio.NewReader(stdOut)
|
||||
|
||||
t := make(chan int)
|
||||
|
||||
go func() {
|
||||
|
||||
var timeout = 0
|
||||
for {
|
||||
time.Sleep(time.Duration(1000) * time.Millisecond)
|
||||
timeout++
|
||||
|
||||
select {
|
||||
case <-t:
|
||||
return
|
||||
default:
|
||||
t <- timeout
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
for {
|
||||
|
||||
select {
|
||||
case timeout := <-t:
|
||||
if timeout >= 20 && tmpSegment == 1 {
|
||||
cmd.Process.Kill()
|
||||
err = errors.New("Timout")
|
||||
ShowError(err, 4006)
|
||||
addErrorToStream(err)
|
||||
cmd.Wait()
|
||||
f.Close()
|
||||
return
|
||||
}
|
||||
|
||||
default:
|
||||
|
||||
}
|
||||
|
||||
if fileSize == 0 && stream.Status == false {
|
||||
showInfo("Streaming Status:Receive data from " + bufferType)
|
||||
}
|
||||
|
||||
if clientConnection(stream) == false {
|
||||
cmd.Process.Kill()
|
||||
f.Close()
|
||||
cmd.Wait()
|
||||
return
|
||||
}
|
||||
|
||||
n, err := reader.Read(buffer)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
fileSize = fileSize + len(buffer[:n])
|
||||
|
||||
if _, err := f.Write(buffer[:n]); err != nil {
|
||||
cmd.Process.Kill()
|
||||
ShowError(err, 0)
|
||||
addErrorToStream(err)
|
||||
cmd.Wait()
|
||||
f.Close()
|
||||
return
|
||||
}
|
||||
|
||||
if fileSize >= bufferSize/2 {
|
||||
|
||||
if tmpSegment == 1 && stream.Status == false {
|
||||
close(t)
|
||||
close(streamStatus)
|
||||
showInfo(fmt.Sprintf("Streaming Status:Buffering data from %s", bufferType))
|
||||
}
|
||||
|
||||
f.Close()
|
||||
tmpSegment++
|
||||
|
||||
if stream.Status == false {
|
||||
stream.Status = true
|
||||
playlist.Streams[streamID] = stream
|
||||
BufferInformation.Store(playlistID, playlist)
|
||||
}
|
||||
|
||||
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
|
||||
|
||||
fileSize = 0
|
||||
|
||||
var errCreate, errOpen error
|
||||
f, errCreate = os.Create(tmpFile)
|
||||
f, errOpen = os.OpenFile(tmpFile, os.O_APPEND|os.O_WRONLY, 0600)
|
||||
if errCreate != nil || errOpen != nil {
|
||||
cmd.Process.Kill()
|
||||
ShowError(err, 0)
|
||||
addErrorToStream(err)
|
||||
cmd.Wait()
|
||||
f.Close()
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
cmd.Process.Kill()
|
||||
cmd.Wait()
|
||||
|
||||
err = errors.New(bufferType + " error")
|
||||
addErrorToStream(err)
|
||||
ShowError(err, 1204)
|
||||
|
||||
time.Sleep(time.Duration(500) * time.Millisecond)
|
||||
clientConnection(stream)
|
||||
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func getTuner(id, playlistType string) (tuner int) {
|
||||
|
||||
switch Settings.Buffer {
|
||||
|
||||
case false:
|
||||
case "-":
|
||||
tuner = Settings.Tuner
|
||||
|
||||
case true:
|
||||
case "xteve", "ffmpeg", "vlc":
|
||||
|
||||
i, err := strconv.Atoi(getProviderParameter(id, playlistType, "tuner"))
|
||||
if err == nil {
|
||||
|
||||
Reference in New Issue
Block a user