Add FFmpeg and VLC support

This commit is contained in:
marmei
2019-09-27 19:24:31 +02:00
parent 11453c6053
commit ad992eb615
21 changed files with 834 additions and 103 deletions

View File

@@ -6,6 +6,8 @@ package src
*/
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
@@ -13,6 +15,7 @@ import (
"net/http"
"net/url"
"os"
"os/exec"
"path"
"sort"
"strconv"
@@ -205,7 +208,17 @@ func bufferingStream(playlistID, streamingURL, channelName string, w http.Respon
playlist.Streams[streamID] = stream
BufferInformation.Store(playlistID, playlist)
go connectToStreamingServer(streamID, playlistID)
switch Settings.Buffer {
case "xteve":
go connectToStreamingServer(streamID, playlistID)
case "ffmpeg", "vlc":
go bufferWithFfmpeg(streamID, playlistID)
default:
break
}
showInfo(fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner))
@@ -400,7 +413,7 @@ func getTmpFiles(stream *ThisStream) (tmpFiles []string) {
return
}
if len(files) > 1 {
if len(files) > 2 {
for _, file := range files {
@@ -526,6 +539,9 @@ func connectToStreamingServer(streamID int, playlistID string) {
var m3u8Segments []string
var bandwidth BandwidthCalculation
var networkBandwidth = Settings.M3U8AdaptiveBandwidthMBPS * 1e+6
// Größe des Buffers
var bufferSize = Settings.BufferSize
var buffer = make([]byte, 1024*bufferSize*2)
var defaultSegment = func() {
@@ -805,8 +821,8 @@ func connectToStreamingServer(streamID int, playlistID string) {
var fileSize int
// Größe des Buffers
buffer := make([]byte, 1024*Settings.BufferSize*2)
var tmpFileSize = 1024 * Settings.BufferSize * 1
buffer = make([]byte, 1024*bufferSize*2)
var tmpFileSize = 1024 * bufferSize * 1
debug = fmt.Sprintf("Buffer Size:%d KB [SERVER CONNECTION]", len(buffer)/1024)
showDebug(debug, 3)
@@ -882,7 +898,7 @@ func connectToStreamingServer(streamID int, playlistID string) {
}
// Buffer auf die Festplatte speichern
if fileSize >= tmpFileSize || n == 0 {
if fileSize >= tmpFileSize/2 || n == 0 {
bandwidth.Stop = time.Now()
bandwidth.Size += fileSize
@@ -928,6 +944,7 @@ func connectToStreamingServer(streamID int, playlistID string) {
}
fileSize = 0
buffer = make([]byte, 1024*bufferSize*2)
if n == 0 {
bufferFile.Close()
@@ -1319,14 +1336,255 @@ func switchBandwidth(stream *ThisStream) (err error) {
return
}
// Buffer mit FFMPEG
func bufferWithFfmpeg(streamID int, playlistID string) {
if p, ok := BufferInformation.Load(playlistID); ok {
var playlist = p.(Playlist)
var debug, path, options, bufferType string
var tmpSegment = 1
var fileSize = 0
var stream = playlist.Streams[streamID]
var buf bytes.Buffer
var tmpFolder = playlist.Streams[streamID].Folder
var url = playlist.Streams[streamID].URL
bufferType = strings.ToUpper(Settings.Buffer)
switch Settings.Buffer {
case "ffmpeg":
path = Settings.FFmpegPath
options = Settings.FFmpegOptions
case "vlc":
path = Settings.VLCPath
options = Settings.VLCOptions
default:
return
}
var addErrorToStream = func(err error) {
var stream = playlist.Streams[streamID]
if c, ok := BufferClients.Load(playlistID + stream.MD5); ok {
var clients = c.(ClientConnection)
clients.Error = err
BufferClients.Store(playlistID+stream.MD5, clients)
}
}
os.RemoveAll(getPlatformPath(tmpFolder))
err := checkFolder(tmpFolder)
if err != nil {
ShowError(err, 0)
addErrorToStream(err)
return
}
showInfo("Streaming URL:" + stream.URL)
// Größe des Buffers
buffer := make([]byte, 1024*Settings.BufferSize*2)
var tmpFileSize = 1024 * Settings.BufferSize * 1
debug = fmt.Sprintf("Buffer Size:%d KB [SERVER CONNECTION]", len(buffer)/1024)
showDebug(debug, 3)
debug = fmt.Sprintf("Buffer Size:%d KB [CLIENT CONNECTION]", tmpFileSize/1024)
showDebug(debug, 3)
var tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
bufferFile, err := os.Create(tmpFile)
if err != nil {
addErrorToStream(err)
bufferFile.Close()
return
}
var args = strings.Replace(options, "[URL]", url, -1)
var cmd = exec.Command(path, strings.Split(args, " ")...)
debug = fmt.Sprintf("%s:%s %s", bufferType, path, args)
showDebug(debug, 1)
// Byte-Daten vom Prozess
stdOut, err := cmd.StdoutPipe()
if err != nil {
ShowError(err, 0)
cmd.Process.Kill()
cmd.Wait()
addErrorToStream(err)
return
}
// Log-Daten vom Prozess
logOut, err := cmd.StderrPipe()
if err != nil {
ShowError(err, 0)
cmd.Process.Kill()
cmd.Wait()
addErrorToStream(err)
return
}
if len(buf.Bytes()) == 0 && stream.Status == false {
showInfo(bufferType + ":Processing data")
}
cmd.Start()
go func() {
// Log Daten vom Prozess im Dubug Mode 2 anzeigen.
scanner := bufio.NewScanner(logOut)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
debug = fmt.Sprintf("%s:%s", bufferType, strings.TrimSpace(scanner.Text()))
showDebug(debug, 1)
}
}()
scanner := bufio.NewScanner(stdOut)
scanner.Split(bufio.ScanBytes)
for scanner.Scan() {
if len(buf.Bytes()) == 0 && stream.Status == false {
showInfo("Streaming Status:Receive data from " + bufferType)
}
b := scanner.Bytes()
for _, i := range b {
buf.WriteByte(i)
}
fileSize = fileSize + len(b)
if clientConnection(stream) == false {
cmd.Process.Kill()
bufferFile.Close()
err = os.RemoveAll(stream.Folder)
if err != nil {
ShowError(err, 4005)
}
cmd.Wait()
return
}
if fileSize >= tmpFileSize/2 {
if stream.Status == false {
showInfo("Streaming Status:Buffering data from " + bufferType)
}
if _, err := bufferFile.Write(buf.Bytes()); err != nil {
cmd.Process.Kill()
ShowError(err, 0)
addErrorToStream(err)
cmd.Wait()
return
}
debug = fmt.Sprintf("Buffer Status:Done (%s)", tmpFile)
showDebug(debug, 2)
bufferFile.Close()
stream.Status = true
playlist.Streams[streamID] = stream
BufferInformation.Store(playlistID, playlist)
tmpSegment++
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
if clientConnection(stream) == false {
cmd.Process.Kill()
bufferFile.Close()
err = os.RemoveAll(stream.Folder)
if err != nil {
ShowError(err, 4005)
}
cmd.Wait()
return
}
bufferFile, err = os.Create(tmpFile)
if err != nil {
addErrorToStream(err)
cmd.Process.Kill()
cmd.Wait()
return
}
buf.Reset()
fileSize = 0
/*
if n == 0 {
bufferFile.Close()
run.Process.Kill()
break
}
*/
}
}
//cmd.Process.Kill()
cmd.Wait()
err = errors.New(bufferType + " error")
addErrorToStream(err)
ShowError(err, 1204)
time.Sleep(time.Duration(1000) * time.Millisecond)
clientConnection(stream)
//os.Exit(0)
return
}
}
func getTuner(id, playlistType string) (tuner int) {
switch Settings.Buffer {
case false:
case "-":
tuner = Settings.Tuner
case true:
case "xteve", "ffmpeg", "vlc":
i, err := strconv.Atoi(getProviderParameter(id, playlistType, "tuner"))
if err == nil {