Files
xTeVe/src/buffer.go
2020-02-08 11:10:14 +01:00

1747 lines
36 KiB
Go

package src
/*
Tuner-Limit Bild als Video rendern [ffmpeg]
-loop 1 -i stream-limit.jpg -c:v libx264 -t 1 -pix_fmt yuv420p -vf scale=1920:1080 stream-limit.ts
*/
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"sort"
"strconv"
"strings"
"time"
)
func createStreamID(stream map[int]ThisStream) (streamID int) {
var debug string
streamID = 0
for i := 0; i <= len(stream); i++ {
if _, ok := stream[i]; !ok {
streamID = i
break
}
}
debug = fmt.Sprintf("Streaming Status:Stream ID = %d", streamID)
showDebug(debug, 1)
return
}
func bufferingStream(playlistID, streamingURL, channelName string, w http.ResponseWriter, r *http.Request) {
time.Sleep(time.Duration(Settings.BufferTimeout) * time.Millisecond)
var playlist Playlist
var client ThisClient
var stream ThisStream
var streaming = false
var streamID int
var debug string
var timeOut = 0
var newStream = true
//w.Header().Set("Connection", "keep-alive")
w.Header().Set("Connection", "close")
// Überprüfen ob die Playlist schon verwendet wird
if p, ok := BufferInformation.Load(playlistID); !ok {
var playlistType string
// Playlist wird noch nicht verwendet, Default-Werte für die Playlist erstellen
playlist.Folder = System.Folder.Temp + playlistID + string(os.PathSeparator)
playlist.PlaylistID = playlistID
playlist.Streams = make(map[int]ThisStream)
playlist.Clients = make(map[int]ThisClient)
err := checkFolder(playlist.Folder)
if err != nil {
ShowError(err, 000)
httpStatusError(w, r, 404)
return
}
switch playlist.PlaylistID[0:1] {
case "M":
playlistType = "m3u"
case "H":
playlistType = "hdhr"
}
playlist.Tuner = getTuner(playlistID, playlistType)
playlist.PlaylistName = getProviderParameter(playlist.PlaylistID, playlistType, "name")
// Default-Werte für den Stream erstellen
streamID = createStreamID(playlist.Streams)
client.Connection = 1
stream.URL = streamingURL
stream.ChannelName = channelName
stream.Status = false
playlist.Streams[streamID] = stream
playlist.Clients[streamID] = client
BufferInformation.Store(playlistID, playlist)
} else {
// Playlist wird bereits zum streamen verwendet
// Überprüfen ob die URL bereit von einem anderen Client gestreamt wird.
playlist = p.(Playlist)
for id := range playlist.Streams {
stream = playlist.Streams[id]
client = playlist.Clients[id]
if streamingURL == stream.URL {
streamID = id
newStream = false
client.Connection++
//playlist.Streams[streamID] = stream
playlist.Clients[streamID] = client
BufferInformation.Store(playlistID, playlist)
debug = fmt.Sprintf("Restream Status:Playlist: %s - Channel: %s - Connections: %d", playlist.PlaylistName, stream.ChannelName, client.Connection)
showDebug(debug, 1)
if c, ok := BufferClients.Load(playlistID + stream.MD5); ok {
var clients = c.(ClientConnection)
clients.Connection = clients.Connection + 1
showInfo(fmt.Sprintf("Streaming Status:Channel: %s (Clients: %d)", stream.ChannelName, clients.Connection))
BufferClients.Store(playlistID+stream.MD5, clients)
}
break
}
}
// Neuer Stream bei einer bereits aktiven Playlist
if newStream == true {
// Prüfen ob die Playlist noch einen weiteren Stream erlaubt (Tuner)
if len(playlist.Streams) >= playlist.Tuner {
showInfo(fmt.Sprintf("Streaming Status:Playlist: %s - No new connections available. Tuner = %d", playlist.PlaylistName, playlist.Tuner))
if value, ok := webUI["html/video/stream-limit.ts"]; ok {
var content string
content = GetHTMLString(value.(string))
w.WriteHeader(200)
w.Header().Set("Content-type", "video/mpeg")
w.Header().Set("Content-Length:", "0")
for i := 1; i < 60; i++ {
_ = i
w.Write([]byte(content))
time.Sleep(time.Duration(500) * time.Millisecond)
}
return
}
return
}
// Playlist erlaubt einen weiterern Stream (Das Limit des Tuners ist noch nicht erreicht)
// Default-Werte für den Stream erstellen
stream = ThisStream{}
client = ThisClient{}
streamID = createStreamID(playlist.Streams)
client.Connection = 1
stream.URL = streamingURL
stream.ChannelName = channelName
stream.Status = false
playlist.Streams[streamID] = stream
playlist.Clients[streamID] = client
BufferInformation.Store(playlistID, playlist)
}
}
// Überprüfen ob der Stream breits von einem anderen Client abgespielt wird
if playlist.Streams[streamID].Status == false && newStream == true {
// Neuer Buffer wird benötigt
stream = playlist.Streams[streamID]
stream.MD5 = getMD5(streamingURL)
stream.Folder = playlist.Folder + stream.MD5 + string(os.PathSeparator)
stream.PlaylistID = playlistID
stream.PlaylistName = playlist.PlaylistName
playlist.Streams[streamID] = stream
BufferInformation.Store(playlistID, playlist)
switch Settings.Buffer {
case "xteve":
go connectToStreamingServer(streamID, playlistID)
case "ffmpeg", "vlc":
go thirdPartyBuffer(streamID, playlistID)
default:
break
}
showInfo(fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner))
var clients ClientConnection
clients.Connection = 1
BufferClients.Store(playlistID+stream.MD5, clients)
}
w.WriteHeader(200)
for { // Loop 1: Warten bis das erste Segment durch den Buffer heruntergeladen wurde
if p, ok := BufferInformation.Load(playlistID); ok {
var playlist = p.(Playlist)
if stream, ok := playlist.Streams[streamID]; ok {
if stream.Status == false {
timeOut++
time.Sleep(time.Duration(100) * time.Millisecond)
if c, ok := BufferClients.Load(playlistID + stream.MD5); ok {
var clients = c.(ClientConnection)
if clients.Error != nil || timeOut > 200 {
killClientConnection(streamID, stream.PlaylistID, false)
return
}
}
continue
}
var oldSegments []string
for { // Loop 2: Temporäre Datein sind vorhanden, Daten können zum Client gesendet werden
// HTTP Clientverbindung überwachen
cn, ok := w.(http.CloseNotifier)
if ok {
select {
case <-cn.CloseNotify():
killClientConnection(streamID, playlistID, false)
return
default:
if c, ok := BufferClients.Load(playlistID + stream.MD5); ok {
var clients = c.(ClientConnection)
if clients.Error != nil {
ShowError(clients.Error, 0)
killClientConnection(streamID, playlistID, false)
return
}
} else {
return
}
}
}
if _, err := os.Stat(stream.Folder); os.IsNotExist(err) {
killClientConnection(streamID, playlistID, false)
return
}
var tmpFiles = getTmpFiles(&stream)
//fmt.Println("Buffer Loop:", stream.Connection)
for _, f := range tmpFiles {
if _, err := os.Stat(stream.Folder); os.IsNotExist(err) {
killClientConnection(streamID, playlistID, false)
return
}
oldSegments = append(oldSegments, f)
var fileName = stream.Folder + f
file, err := os.Open(fileName)
defer file.Close()
if err == nil {
l, err := file.Stat()
if err == nil {
debug = fmt.Sprintf("Buffer Status:Send to client (%s)", fileName)
showDebug(debug, 2)
var buffer = make([]byte, int(l.Size()))
_, err = file.Read(buffer)
if err == nil {
file.Seek(0, 0)
if streaming == false {
contentType := http.DetectContentType(buffer)
_ = contentType
//w.Header().Set("Content-type", "video/mpeg")
w.Header().Set("Content-type", contentType)
w.Header().Set("Content-Length", "0")
w.Header().Set("Connection", "close")
}
/*
// HDHR Header
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Pragma", "no-cache")
w.Header().Set("transferMode.dlna.org", "Streaming")
*/
_, err := w.Write(buffer)
if err != nil {
file.Close()
killClientConnection(streamID, playlistID, false)
return
}
file.Close()
streaming = true
}
file.Close()
}
var n = indexOfString(f, oldSegments)
if n > 20 {
var fileToRemove = stream.Folder + oldSegments[0]
os.RemoveAll(getPlatformFile(fileToRemove))
oldSegments = append(oldSegments[:0], oldSegments[0+1:]...)
}
}
file.Close()
}
if len(tmpFiles) == 0 {
time.Sleep(time.Duration(100) * time.Millisecond)
}
} // Ende Loop 2
} else {
// Stream nicht vorhanden
killClientConnection(streamID, stream.PlaylistID, false)
showInfo(fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner))
return
}
} // Ende BufferInformation
} // Ende Loop 1
}
func getTmpFiles(stream *ThisStream) (tmpFiles []string) {
var tmpFolder = stream.Folder
var fileIDs []float64
if _, err := os.Stat(tmpFolder); !os.IsNotExist(err) {
files, err := ioutil.ReadDir(getPlatformPath(tmpFolder))
if err != nil {
ShowError(err, 000)
return
}
if len(files) > 2 {
for _, file := range files {
var fileID = strings.Replace(file.Name(), ".ts", "", -1)
var f, err = strconv.ParseFloat(fileID, 64)
if err == nil {
fileIDs = append(fileIDs, f)
}
}
sort.Float64s(fileIDs)
fileIDs = fileIDs[:len(fileIDs)-1]
for _, file := range fileIDs {
var fileName = fmt.Sprintf("%d.ts", int64(file))
if indexOfString(fileName, stream.OldSegments) == -1 {
tmpFiles = append(tmpFiles, fileName)
stream.OldSegments = append(stream.OldSegments, fileName)
}
}
}
}
return
}
func killClientConnection(streamID int, playlistID string, force bool) {
Lock.Lock()
defer Lock.Unlock()
if p, ok := BufferInformation.Load(playlistID); ok {
var playlist = p.(Playlist)
if force == true {
delete(playlist.Streams, streamID)
showInfo(fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner))
return
}
if stream, ok := playlist.Streams[streamID]; ok {
if c, ok := BufferClients.Load(playlistID + stream.MD5); ok {
var clients = c.(ClientConnection)
clients.Connection = clients.Connection - 1
BufferClients.Store(playlistID+stream.MD5, clients)
showInfo("Streaming Status:Client has terminated the connection")
showInfo(fmt.Sprintf("Streaming Status:Channel: %s (Clients: %d)", stream.ChannelName, clients.Connection))
if clients.Connection <= 0 {
BufferClients.Delete(playlistID + stream.MD5)
delete(playlist.Streams, streamID)
delete(playlist.Clients, streamID)
}
}
BufferInformation.Store(playlistID, playlist)
if len(playlist.Streams) > 0 {
showInfo(fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner))
}
}
}
}
func clientConnection(stream ThisStream) (status bool) {
status = true
Lock.Lock()
defer Lock.Unlock()
if _, ok := BufferClients.Load(stream.PlaylistID + stream.MD5); !ok {
var debug = fmt.Sprintf("Streaming Status:Remove temporary files (%s)", stream.Folder)
showDebug(debug, 1)
status = false
debug = fmt.Sprintf("Remove tmp folder:%s", stream.Folder)
showDebug(debug, 1)
os.RemoveAll(stream.Folder)
if p, ok := BufferInformation.Load(stream.PlaylistID); ok {
showInfo(fmt.Sprintf("Streaming Status:Channel: %s - No client is using this channel anymore. Streaming Server connection has ended", stream.ChannelName))
var playlist = p.(Playlist)
showInfo(fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner))
if len(playlist.Streams) <= 0 {
BufferInformation.Delete(stream.PlaylistID)
}
}
status = false
}
return
}
func connectToStreamingServer(streamID int, playlistID string) {
if p, ok := BufferInformation.Load(playlistID); ok {
var playlist = p.(Playlist)
var timeOut = 0
var debug string
var tmpSegment = 1
var tmpFolder = playlist.Streams[streamID].Folder
var m3u8Segments []string
var bandwidth BandwidthCalculation
var networkBandwidth = Settings.M3U8AdaptiveBandwidthMBPS * 1e+6
// Größe des Buffers
var bufferSize = Settings.BufferSize
var buffer = make([]byte, 1024*bufferSize*2)
var defaultSegment = func() {
var segment Segment
if len(playlist.Streams[streamID].Location) > 0 {
segment.URL = playlist.Streams[streamID].Location
} else {
segment.URL = playlist.Streams[streamID].URL
}
segment.Duration = 0
var stream = playlist.Streams[streamID]
stream.Segment = []Segment{}
stream.Segment = append(stream.Segment, segment)
stream.HLS = false
stream.Sequence = 0
stream.Wait = 0
stream.NetworkBandwidth = networkBandwidth
playlist.Streams[streamID] = stream
timeOut++
}
var addErrorToStream = func(err error) {
var stream = playlist.Streams[streamID]
if c, ok := BufferClients.Load(playlistID + stream.MD5); ok {
var clients = c.(ClientConnection)
clients.Error = err
BufferClients.Store(playlistID+stream.MD5, clients)
}
}
os.RemoveAll(getPlatformPath(tmpFolder))
err := checkFolder(tmpFolder)
if err != nil {
ShowError(err, 0)
addErrorToStream(err)
return
}
// M3U8 Segmente
InitBuffer:
defaultSegment()
if len(m3u8Segments) > 30 {
m3u8Segments = m3u8Segments[15:]
}
if timeOut >= 10 {
return
}
var stream ThisStream = playlist.Streams[streamID]
if stream.Status == false {
if strings.Index(stream.URL, ".m3u8") != -1 {
showInfo("Streaming Type:" + "[HLS / M3U8]")
} else {
showInfo("Streaming Type:" + "[TS]")
}
showInfo("Streaming URL:" + stream.URL)
}
var s = 0
stream.TimeStart = time.Now()
bandwidth.Start = stream.TimeStart
bandwidth.Size = 0
for {
if clientConnection(stream) == false {
return
}
if len(stream.Segment) == 0 || len(stream.URL) == 0 {
goto InitBuffer
}
var segment = stream.Segment[0]
var currentURL = strings.Trim(segment.URL, "\r\n")
if len(currentURL) == 0 {
goto InitBuffer
}
debug = fmt.Sprintf("Connection to:%s", currentURL)
showDebug(debug, 2)
// Sprung für Redirect (301 <---> 308)
Redirect:
req, err := http.NewRequest("GET", currentURL, nil)
req.Header.Set("User-Agent", Settings.UserAgent)
req.Header.Set("Connection", "close")
//req.Header.Set("Range", "bytes=0-")
req.Header.Set("Accept", "*/*")
debugRequest(req)
client := &http.Client{}
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return errors.New("Redirect")
}
resp, err := client.Do(req)
if resp != nil && err != nil {
debugResponse(resp)
}
if err != nil {
if resp == nil {
err = errors.New("No response from streaming server")
fmt.Println("Current URL:", currentURL)
ShowError(err, 0)
addErrorToStream(err)
killClientConnection(streamID, playlistID, true)
clientConnection(stream)
return
}
// Redirect
if resp.StatusCode >= 301 && resp.StatusCode <= 308 {
debug = fmt.Sprintf("Streaming Status:HTTP response status [%d] %s", resp.StatusCode, http.StatusText(resp.StatusCode))
showDebug(debug, 2)
currentURL = strings.Trim(resp.Header.Get("Location"), "\r\n")
stream.Location = currentURL
if len(currentURL) > 0 {
debug = fmt.Sprintf("HTTP Redirect:%s", stream.Location)
showDebug(debug, 2)
defer resp.Body.Close()
goto Redirect
} else {
err = errors.New("Streaming server")
ShowError(err, 4002)
addErrorToStream(err)
defer resp.Body.Close()
return
}
} else {
ShowError(err, 0)
addErrorToStream(err)
defer resp.Body.Close()
return
}
defer resp.Body.Close()
}
defer resp.Body.Close()
// HTTP Status überprüfen, bei Fehlern wird der Stream beendet
var contentType = resp.Header.Get("Content-Type")
var httpStatusCode = resp.StatusCode
var httpStatusInfo = fmt.Sprintf("HTTP Response Status [%d] %s", httpStatusCode, http.StatusText(resp.StatusCode))
if resp.StatusCode != http.StatusOK {
showInfo("Content Type:" + contentType)
showInfo("Streaming Status:" + httpStatusInfo)
showInfo("Error with this URL:" + currentURL)
var err = errors.New(http.StatusText(resp.StatusCode))
ShowError(err, 4004)
debug = fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner)
showDebug(debug, 1)
BufferInformation.Store(playlist.PlaylistID, playlist)
addErrorToStream(err)
killClientConnection(streamID, playlistID, true)
clientConnection(stream)
resp.Body.Close()
return
}
// Informationen über den Streamingserver auslesen
if stream.Status == false {
if len(stream.URLStreamingServer) == 0 {
u, _ := url.Parse(currentURL)
p, _ := url.Parse(currentURL)
stream.URLScheme = u.Scheme
stream.URLHost = req.Host
stream.URLPath = p.Path
stream.URLFile = path.Base(p.Path)
stream.URLRedirect = fmt.Sprintf("%s://%s%s", stream.URLScheme, stream.URLHost, stream.URLPath)
stream.URLStreamingServer = fmt.Sprintf("%s://%s", stream.URLScheme, stream.URLHost)
}
debug = fmt.Sprintf("Server URL:%s", stream.URLStreamingServer)
showDebug(debug, 1)
debug = fmt.Sprintf("Temp Folder:%s", tmpFolder)
showDebug(debug, 1)
showInfo("Streaming Status:" + "HTTP Response Status [" + strconv.Itoa(resp.StatusCode) + "] " + http.StatusText(resp.StatusCode))
showInfo("Content Type:" + contentType)
} else {
debug = fmt.Sprintf("Content Type:%s", contentType)
showDebug(debug, 2)
}
// Content Type bereinigen
if len(contentType) > 0 {
var ct = strings.SplitN(contentType, ";", 2)
contentType = strings.ToLower(ct[0])
}
switch contentType {
// M3U8 Playlist
case "application/x-mpegurl", "application/vnd.apple.mpegurl", "audio/mpegurl", "audio/x-mpegurl":
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
ShowError(err, 0)
addErrorToStream(err)
}
stream.Body = string(body)
stream.HLS = true
stream.M3U8URL = currentURL
err = parseM3U8(&stream)
if err != nil {
ShowError(err, 4050)
addErrorToStream(err)
}
// Video Stream (TS)
case "video/mpeg", "video/mp4", "video/mp2t", "video/m2ts", "application/octet-stream", "binary/octet-stream", "application/mp2t":
var fileSize int
// Größe des Buffers
buffer = make([]byte, 1024*bufferSize*2)
var tmpFileSize = 1024 * bufferSize * 1
debug = fmt.Sprintf("Buffer Size:%d KB [SERVER CONNECTION]", len(buffer)/1024)
showDebug(debug, 3)
debug = fmt.Sprintf("Buffer Size:%d KB [CLIENT CONNECTION]", tmpFileSize/1024)
showDebug(debug, 3)
var tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
if clientConnection(stream) == false {
resp.Body.Close()
return
}
bufferFile, err := os.Create(tmpFile)
if err != nil {
addErrorToStream(err)
bufferFile.Close()
resp.Body.Close()
return
}
for {
if fileSize == 0 {
debug = fmt.Sprintf("Buffer Status:Buffering (%s)", tmpFile)
showDebug(debug, 2)
}
timeOut = 0
// Buffer mit Daten vom Server füllen
n, err := resp.Body.Read(buffer)
if err != nil && err != io.EOF {
ShowError(err, 0)
addErrorToStream(err)
resp.Body.Close()
return
}
defer resp.Body.Close()
if _, err := bufferFile.Write(buffer[:n]); err != nil {
ShowError(err, 0)
addErrorToStream(err)
resp.Body.Close()
return
}
defer bufferFile.Close()
fileSize = fileSize + n
if clientConnection(stream) == false {
resp.Body.Close()
bufferFile.Close()
err = os.RemoveAll(stream.Folder)
if err != nil {
ShowError(err, 4005)
}
return
}
// Buffer auf die Festplatte speichern
if fileSize >= tmpFileSize/2 || n == 0 {
Lock.Lock()
bandwidth.Stop = time.Now()
bandwidth.Size += fileSize
bandwidth.TimeDiff = bandwidth.Stop.Sub(bandwidth.Start).Seconds()
networkBandwidth = int(float64(bandwidth.Size) / bandwidth.TimeDiff * 1000)
stream.NetworkBandwidth = networkBandwidth
bandwidth.NetworkBandwidth = stream.NetworkBandwidth
debug = fmt.Sprintf("Buffer Status:Done (%s)", tmpFile)
showDebug(debug, 2)
bufferFile.Close()
stream.Status = true
playlist.Streams[streamID] = stream
BufferInformation.Store(playlistID, playlist)
Lock.Unlock()
tmpSegment++
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
if clientConnection(stream) == false {
bufferFile.Close()
resp.Body.Close()
err = os.RemoveAll(stream.Folder)
if err != nil {
ShowError(err, 4005)
}
return
}
bufferFile, err = os.Create(tmpFile)
if err != nil {
addErrorToStream(err)
resp.Body.Close()
return
}
fileSize = 0
buffer = make([]byte, 1024*bufferSize*2)
if n == 0 {
bufferFile.Close()
resp.Body.Close()
break
}
}
}
//--
// Umbekanntes Format
default:
showInfo("Content Type:" + resp.Header.Get("Content-Type"))
err = errors.New("Streaming error")
ShowError(err, 4003)
addErrorToStream(err)
resp.Body.Close()
return
}
s++
// Wartezeit für den Download das nächste Segments berechnen
if stream.HLS == true {
var sleep float64
if segment.Duration > 0 {
stream.TimeEnd = time.Now()
stream.TimeDiff = stream.TimeEnd.Sub(stream.TimeStart).Seconds()
sleep = (segment.Duration - stream.TimeDiff) - (segment.Duration * 0.25)
if sleep < 0 {
sleep = 0
}
debug = fmt.Sprintf("HLS Status:Download time: %f s | Segment duration: %f s | Sleep: %f s Sequence: %d", stream.TimeDiff, segment.Duration, sleep, segment.Sequence)
showDebug(debug, 1)
if sleep > 0 {
for i := 0.0; i < sleep*1000; i = i + 100 {
_ = i
time.Sleep(time.Duration(100) * time.Millisecond)
if _, err := os.Stat(stream.Folder); os.IsNotExist(err) {
break
}
}
}
}
}
stream.Segment = stream.Segment[1:len(stream.Segment)]
resp.Body.Close()
} // Ende for loop
} // Ende BufferInformation
}
func parseM3U8(stream *ThisStream) (err error) {
var debug string
var noNewSegment = false
var lastSegmentDuration float64
var segment Segment
var m3u8Segments []Segment
var sequence int64
stream.DynamicBandwidth = false
debug = fmt.Sprintf(`M3U8 Playlist:`+"\n"+`%s`, stream.Body)
showDebug(debug, 3)
var getBandwidth = func(line string) int {
var infos = strings.Split(line, ",")
for _, info := range infos {
if strings.Contains(info, "BANDWIDTH=") {
var bandwidth = strings.Replace(info, "BANDWIDTH=", "", -1)
n, err := strconv.Atoi(bandwidth)
if err == nil {
return n
}
}
}
return 0
}
var parseParameter = func(line string, segment *Segment) (err error) {
line = strings.Trim(line, "\r\n")
var parameters = []string{"#EXT-X-VERSION:", "#EXT-X-PLAYLIST-TYPE:", "#EXT-X-MEDIA-SEQUENCE:", "#EXT-X-STREAM-INF:", "#EXTINF:"}
for _, parameter := range parameters {
if strings.Contains(line, parameter) {
var value = strings.Replace(line, parameter, "", -1)
switch parameter {
case "#EXT-X-VERSION:":
version, err := strconv.Atoi(value)
if err == nil {
segment.Version = version
}
case "#EXT-X-PLAYLIST-TYPE:":
segment.PlaylistType = value
case "#EXT-X-MEDIA-SEQUENCE:":
n, err := strconv.ParseInt(value, 10, 64)
if err == nil {
stream.Sequence = n
sequence = n
}
case "#EXT-X-STREAM-INF:":
segment.Info = true
segment.StreamInf.Bandwidth = getBandwidth(value)
case "#EXTINF:":
var d = strings.Split(value, ",")
if len(d) > 0 {
value = strings.Replace(d[0], ",", "", -1)
duration, err := strconv.ParseFloat(value, 64)
if err == nil {
segment.Duration = duration
} else {
ShowError(err, 1050)
return err
}
}
}
}
}
return
}
var parseURL = func(line string, segment *Segment) {
// Prüfen ob die Adresse eine gültige URL ist (http://... oder /path/to/stream)
_, err := url.ParseRequestURI(line)
if err == nil {
// Prüfen ob die Domain in der Adresse enhalten ist
u, _ := url.Parse(line)
if len(u.Host) == 0 {
// Adresse enthällt nicht die Domain, Redirect wird der Adresse hinzugefügt
segment.URL = stream.URLStreamingServer + line
} else {
// Domain in der Adresse enthalten
segment.URL = line
}
} else {
// keine URL, sondern ein Dateipfad (media/file-01.ts)
var serverURLPath = strings.Replace(stream.M3U8URL, path.Base(stream.M3U8URL), line, -1)
segment.URL = serverURLPath
}
return
}
if strings.Contains(stream.Body, "#EXTM3U") {
var lines = strings.Split(strings.Replace(stream.Body, "\r\n", "\n", -1), "\n")
if stream.DynamicBandwidth == false {
stream.DynamicStream = make(map[int]DynamicStream)
}
// Parameter parsen
for i, line := range lines {
_ = i
if len(line) > 0 {
if line[0:1] == "#" {
err := parseParameter(line, &segment)
if err != nil {
return err
}
lastSegmentDuration = segment.Duration
}
// M3U8 enthällt mehrere Links zu weiteren M3U8 Wiedergabelisten (Bandbreitenoption)
if segment.Info == true && len(line) > 0 && line[0:1] != "#" {
var dynamicStream DynamicStream
segment.Duration = 0
noNewSegment = false
stream.DynamicBandwidth = true
parseURL(line, &segment)
dynamicStream.Bandwidth = segment.StreamInf.Bandwidth
dynamicStream.URL = segment.URL
stream.DynamicStream[dynamicStream.Bandwidth] = dynamicStream
}
// Segment mit TS Stream
if segment.Duration > 0 && line[0:1] != "#" {
parseURL(line, &segment)
if len(segment.URL) > 0 {
segment.Sequence = sequence
m3u8Segments = append(m3u8Segments, segment)
sequence++
}
}
}
}
} else {
err = errors.New(getErrMsg(4051))
return
}
if len(m3u8Segments) > 0 {
noNewSegment = true
if stream.Status == false {
if len(m3u8Segments) >= 2 {
m3u8Segments = m3u8Segments[0 : len(m3u8Segments)-1]
}
}
for _, s := range m3u8Segments {
segment = s
if stream.Status == false {
noNewSegment = false
stream.LastSequence = segment.Sequence
// Stream ist vom Typ VOD. Es muss das erste Segment der M3U8 Playlist verwendet werden.
if strings.ToUpper(segment.PlaylistType) == "VOD" {
break
}
} else {
if segment.Sequence > stream.LastSequence {
stream.LastSequence = segment.Sequence
noNewSegment = false
break
}
}
}
}
if noNewSegment == false {
if stream.DynamicBandwidth == true {
switchBandwidth(stream)
} else {
stream.Segment = append(stream.Segment, segment)
}
}
if noNewSegment == true {
var sleep = lastSegmentDuration * 0.5
for i := 0.0; i < sleep*1000; i = i + 100 {
_ = i
time.Sleep(time.Duration(100) * time.Millisecond)
if _, err := os.Stat(stream.Folder); os.IsNotExist(err) {
break
}
}
}
return
}
func switchBandwidth(stream *ThisStream) (err error) {
var bandwidth []int
var dynamicStream DynamicStream
var segment Segment
for key := range stream.DynamicStream {
bandwidth = append(bandwidth, key)
}
sort.Ints(bandwidth)
if len(bandwidth) > 0 {
for i := range bandwidth {
segment.StreamInf.Bandwidth = stream.DynamicStream[bandwidth[i]].Bandwidth
dynamicStream = stream.DynamicStream[bandwidth[0]]
if stream.NetworkBandwidth == 0 {
dynamicStream = stream.DynamicStream[bandwidth[0]]
break
} else {
if bandwidth[i] > stream.NetworkBandwidth {
break
}
dynamicStream = stream.DynamicStream[bandwidth[i]]
}
}
} else {
err = errors.New("M3U8 does not contain streaming URLs")
return
}
segment.URL = dynamicStream.URL
segment.Duration = 0
stream.Segment = append(stream.Segment, segment)
return
}
// Buffer mit FFMPEG
func thirdPartyBuffer(streamID int, playlistID string) {
if p, ok := BufferInformation.Load(playlistID); ok {
var playlist = p.(Playlist)
var debug, path, options, bufferType string
var tmpSegment = 1
var bufferSize = Settings.BufferSize * 1024
var stream = playlist.Streams[streamID]
var buf bytes.Buffer
var fileSize = 0
var streamStatus = make(chan bool)
var tmpFolder = playlist.Streams[streamID].Folder
var url = playlist.Streams[streamID].URL
stream.Status = false
bufferType = strings.ToUpper(Settings.Buffer)
switch Settings.Buffer {
case "ffmpeg":
path = Settings.FFmpegPath
options = Settings.FFmpegOptions
case "vlc":
path = Settings.VLCPath
options = Settings.VLCOptions
default:
return
}
var addErrorToStream = func(err error) {
var stream = playlist.Streams[streamID]
if c, ok := BufferClients.Load(playlistID + stream.MD5); ok {
var clients = c.(ClientConnection)
clients.Error = err
BufferClients.Store(playlistID+stream.MD5, clients)
}
}
os.RemoveAll(getPlatformPath(tmpFolder))
err := checkFolder(tmpFolder)
if err != nil {
ShowError(err, 0)
addErrorToStream(err)
return
}
err = checkFile(path)
if err != nil {
ShowError(err, 0)
addErrorToStream(err)
return
}
showInfo(fmt.Sprintf("%s path:%s", bufferType, path))
showInfo("Streaming URL:" + stream.URL)
var tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
f, err := os.Create(tmpFile)
f.Close()
if err != nil {
addErrorToStream(err)
return
}
//args = strings.Replace(args, "[USER-AGENT]", Settings.UserAgent, -1)
// User-Agent setzen
var args []string
for i, a := range strings.Split(options, " ") {
switch bufferType {
case "FFMPEG":
a = strings.Replace(a, "[URL]", url, -1)
if i == 0 {
if len(Settings.UserAgent) != 0 {
args = []string{"-user_agent", Settings.UserAgent}
}
}
args = append(args, a)
case "VLC":
if a == "[URL]" {
a = strings.Replace(a, "[URL]", url, -1)
args = append(args, a)
if len(Settings.UserAgent) != 0 {
args = append(args, fmt.Sprintf(":http-user-agent=%s", Settings.UserAgent))
}
} else {
args = append(args, a)
}
}
}
var cmd = exec.Command(path, args...)
debug = fmt.Sprintf("%s:%s %s", bufferType, path, args)
showDebug(debug, 1)
// Byte-Daten vom Prozess
stdOut, err := cmd.StdoutPipe()
if err != nil {
ShowError(err, 0)
cmd.Process.Kill()
cmd.Wait()
addErrorToStream(err)
return
}
// Log-Daten vom Prozess
logOut, err := cmd.StderrPipe()
if err != nil {
ShowError(err, 0)
cmd.Process.Kill()
cmd.Wait()
addErrorToStream(err)
return
}
if len(buf.Bytes()) == 0 && stream.Status == false {
showInfo(bufferType + ":Processing data")
}
cmd.Start()
defer cmd.Wait()
go func() {
// Log Daten vom Prozess im Dubug Mode 1 anzeigen.
scanner := bufio.NewScanner(logOut)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
debug = fmt.Sprintf("%s log:%s", bufferType, strings.TrimSpace(scanner.Text()))
select {
case <-streamStatus:
showDebug(debug, 1)
default:
showInfo(debug)
}
time.Sleep(time.Duration(10) * time.Millisecond)
}
}()
f, err = os.OpenFile(tmpFile, os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
panic(err)
}
defer f.Close()
buffer := make([]byte, 1024*4)
reader := bufio.NewReader(stdOut)
t := make(chan int)
go func() {
var timeout = 0
for {
time.Sleep(time.Duration(1000) * time.Millisecond)
timeout++
select {
case <-t:
return
default:
t <- timeout
}
}
}()
for {
select {
case timeout := <-t:
if timeout >= 20 && tmpSegment == 1 {
cmd.Process.Kill()
err = errors.New("Timout")
ShowError(err, 4006)
addErrorToStream(err)
cmd.Wait()
f.Close()
return
}
default:
}
if fileSize == 0 && stream.Status == false {
showInfo("Streaming Status:Receive data from " + bufferType)
}
if clientConnection(stream) == false {
cmd.Process.Kill()
f.Close()
cmd.Wait()
return
}
n, err := reader.Read(buffer)
if err == io.EOF {
break
}
fileSize = fileSize + len(buffer[:n])
if _, err := f.Write(buffer[:n]); err != nil {
cmd.Process.Kill()
ShowError(err, 0)
addErrorToStream(err)
cmd.Wait()
f.Close()
return
}
if fileSize >= bufferSize/2 {
if tmpSegment == 1 && stream.Status == false {
close(t)
close(streamStatus)
showInfo(fmt.Sprintf("Streaming Status:Buffering data from %s", bufferType))
}
f.Close()
tmpSegment++
if stream.Status == false {
Lock.Lock()
stream.Status = true
playlist.Streams[streamID] = stream
BufferInformation.Store(playlistID, playlist)
Lock.Unlock()
}
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
fileSize = 0
var errCreate, errOpen error
f, errCreate = os.Create(tmpFile)
f, errOpen = os.OpenFile(tmpFile, os.O_APPEND|os.O_WRONLY, 0600)
if errCreate != nil || errOpen != nil {
cmd.Process.Kill()
ShowError(err, 0)
addErrorToStream(err)
cmd.Wait()
f.Close()
return
}
}
}
cmd.Process.Kill()
cmd.Wait()
err = errors.New(bufferType + " error")
addErrorToStream(err)
ShowError(err, 1204)
time.Sleep(time.Duration(500) * time.Millisecond)
clientConnection(stream)
return
}
}
func getTuner(id, playlistType string) (tuner int) {
switch Settings.Buffer {
case "-":
tuner = Settings.Tuner
case "xteve", "ffmpeg", "vlc":
i, err := strconv.Atoi(getProviderParameter(id, playlistType, "tuner"))
if err == nil {
tuner = i
} else {
ShowError(err, 0)
tuner = 1
}
}
return
}
func debugRequest(req *http.Request) {
var debugLevel = 3
if System.Flag.Debug < debugLevel {
return
}
var debug string
fmt.Println()
debug = "Request:* * * * * * BEGIN HTTP(S) REQUEST * * * * * * "
showDebug(debug, debugLevel)
debug = fmt.Sprintf("Method:%s", req.Method)
showDebug(debug, debugLevel)
debug = fmt.Sprintf("Proto:%s", req.Proto)
showDebug(debug, debugLevel)
debug = fmt.Sprintf("URL:%s", req.URL)
showDebug(debug, debugLevel)
for name, headers := range req.Header {
name = strings.ToLower(name)
for _, h := range headers {
debug = fmt.Sprintf("Header:%v: %v", name, h)
showDebug(debug, debugLevel)
}
}
debug = "Request:* * * * * * END HTTP(S) REQUEST * * * * * *"
showDebug(debug, debugLevel)
return
}
func debugResponse(resp *http.Response) {
var debugLevel = 3
if System.Flag.Debug < debugLevel {
return
}
var debug string
fmt.Println()
debug = "Response:* * * * * * BEGIN RESPONSE * * * * * * "
showDebug(debug, debugLevel)
debug = fmt.Sprintf("Proto:%s", resp.Proto)
showDebug(debug, debugLevel)
debug = fmt.Sprintf("Status Code:%d", resp.StatusCode)
showDebug(debug, debugLevel)
debug = fmt.Sprintf("Status Text:%s", http.StatusText(resp.StatusCode))
showDebug(debug, debugLevel)
for key, value := range resp.Header {
switch fmt.Sprintf("%T", value) {
case "[]string":
debug = fmt.Sprintf("Header:%v: %s", key, strings.Join(value, " "))
default:
debug = fmt.Sprintf("Header:%v: %v", key, value)
}
showDebug(debug, debugLevel)
}
debug = "Pesponse:* * * * * * END RESPONSE * * * * * * "
showDebug(debug, debugLevel)
return
}