This commit is contained in:
marmei
2019-08-31 20:39:39 +02:00
parent 717fa68b7e
commit efa55b39a9
3 changed files with 352 additions and 346 deletions

View File

@@ -205,7 +205,7 @@ func bufferingStream(playlistID, streamingURL, channelName string, w http.Respon
playlist.Streams[streamID] = stream
BufferInformation.Store(playlistID, playlist)
go connectToStreamingServer(streamID, playlist)
go connectToStreamingServer(streamID, playlistID)
showInfo(fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner))
@@ -507,176 +507,191 @@ func clientConnection(stream ThisStream) (status bool) {
return
}
func connectToStreamingServer(streamID int, playlist Playlist) {
func connectToStreamingServer(streamID int, playlistID string) {
var timeOut = 0
var debug string
var tmpSegment = 1
var tmpFolder = playlist.Streams[streamID].Folder
var m3u8Segments []string
var bandwidth BandwidthCalculation
var networkBandwidth = Settings.M3U8AdaptiveBandwidthMBPS * 1e+6
if p, ok := BufferInformation.Load(playlistID); ok {
var defaultSegment = func() {
var playlist = p.(Playlist)
var segment Segment
var timeOut = 0
var debug string
var tmpSegment = 1
var tmpFolder = playlist.Streams[streamID].Folder
var m3u8Segments []string
var bandwidth BandwidthCalculation
var networkBandwidth = Settings.M3U8AdaptiveBandwidthMBPS * 1e+6
if len(playlist.Streams[streamID].Location) > 0 {
segment.URL = playlist.Streams[streamID].Location
} else {
segment.URL = playlist.Streams[streamID].URL
}
var defaultSegment = func() {
segment.Duration = 0
var segment Segment
var stream = playlist.Streams[streamID]
stream.Segment = []Segment{}
stream.Segment = append(stream.Segment, segment)
if len(playlist.Streams[streamID].Location) > 0 {
segment.URL = playlist.Streams[streamID].Location
} else {
segment.URL = playlist.Streams[streamID].URL
}
stream.HLS = false
stream.Sequence = 0
stream.Wait = 0
stream.NetworkBandwidth = networkBandwidth
segment.Duration = 0
playlist.Streams[streamID] = stream
var stream = playlist.Streams[streamID]
stream.Segment = []Segment{}
stream.Segment = append(stream.Segment, segment)
timeOut++
stream.HLS = false
stream.Sequence = 0
stream.Wait = 0
stream.NetworkBandwidth = networkBandwidth
}
playlist.Streams[streamID] = stream
var addErrorToStream = func(err error) {
var stream = playlist.Streams[streamID]
if c, ok := BufferClients.Load(stream.PlaylistID + stream.MD5); ok {
var clients = c.(ClientConnection)
clients.Error = err
BufferClients.Store(stream.PlaylistID+stream.MD5, clients)
timeOut++
}
}
var addErrorToStream = func(err error) {
os.RemoveAll(getPlatformPath(tmpFolder))
var stream = playlist.Streams[streamID]
err := checkFolder(tmpFolder)
if err != nil {
ShowError(err, 0)
addErrorToStream(err)
return
}
if c, ok := BufferClients.Load(playlistID + stream.MD5); ok {
// M3U8 Segmente
InitBuffer:
defaultSegment()
var clients = c.(ClientConnection)
clients.Error = err
BufferClients.Store(playlistID+stream.MD5, clients)
if len(m3u8Segments) > 30 {
m3u8Segments = m3u8Segments[15:]
}
if timeOut >= 10 {
return
}
}
var stream ThisStream = playlist.Streams[streamID]
if stream.Status == false {
if strings.Index(stream.URL, ".m3u8") != -1 {
showInfo("Streaming Type:" + "[HLS / M3U8]")
} else {
showInfo("Streaming Type:" + "[TS]")
}
showInfo("Streaming URL:" + stream.URL)
os.RemoveAll(getPlatformPath(tmpFolder))
}
var s = 0
stream.TimeStart = time.Now()
bandwidth.Start = stream.TimeStart
bandwidth.Size = 0
for {
if clientConnection(stream) == false {
err := checkFolder(tmpFolder)
if err != nil {
ShowError(err, 0)
addErrorToStream(err)
return
}
if len(stream.Segment) == 0 || len(stream.URL) == 0 {
goto InitBuffer
// M3U8 Segmente
InitBuffer:
defaultSegment()
if len(m3u8Segments) > 30 {
m3u8Segments = m3u8Segments[15:]
}
if timeOut >= 10 {
return
}
var segment = stream.Segment[0]
var stream ThisStream = playlist.Streams[streamID]
var currentURL = strings.Trim(segment.URL, "\r\n")
if stream.Status == false {
if strings.Index(stream.URL, ".m3u8") != -1 {
showInfo("Streaming Type:" + "[HLS / M3U8]")
} else {
showInfo("Streaming Type:" + "[TS]")
}
showInfo("Streaming URL:" + stream.URL)
if len(currentURL) == 0 {
goto InitBuffer
}
debug = fmt.Sprintf("Connection to:%s", currentURL)
showDebug(debug, 2)
var s = 0
// Sprung für Redirect (301 <---> 308)
Redirect:
stream.TimeStart = time.Now()
bandwidth.Start = stream.TimeStart
bandwidth.Size = 0
req, err := http.NewRequest("GET", currentURL, nil)
req.Header.Set("User-Agent", Settings.UserAgent)
req.Header.Set("Connection", "close")
//req.Header.Set("Range", "bytes=0-")
req.Header.Set("Accept", "*/*")
debugRequest(req)
client := &http.Client{}
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return errors.New("Redirect")
}
resp, err := client.Do(req)
if resp != nil && err != nil {
debugResponse(resp)
}
if err != nil {
if resp == nil {
err = errors.New("No response from streaming server")
fmt.Println("Current URL:", currentURL)
ShowError(err, 0)
addErrorToStream(err)
killClientConnection(streamID, stream.PlaylistID, true)
clientConnection(stream)
for {
if clientConnection(stream) == false {
return
}
// Redirect
if resp.StatusCode >= 301 && resp.StatusCode <= 308 {
if len(stream.Segment) == 0 || len(stream.URL) == 0 {
goto InitBuffer
}
debug = fmt.Sprintf("Streaming Status:HTTP response status [%d] %s", resp.StatusCode, http.StatusText(resp.StatusCode))
showDebug(debug, 2)
var segment = stream.Segment[0]
currentURL = strings.Trim(resp.Header.Get("Location"), "\r\n")
var currentURL = strings.Trim(segment.URL, "\r\n")
stream.Location = currentURL
if len(currentURL) == 0 {
goto InitBuffer
}
if len(currentURL) > 0 {
debug = fmt.Sprintf("Connection to:%s", currentURL)
showDebug(debug, 2)
debug = fmt.Sprintf("HTTP Redirect:%s", stream.Location)
// Sprung für Redirect (301 <---> 308)
Redirect:
req, err := http.NewRequest("GET", currentURL, nil)
req.Header.Set("User-Agent", Settings.UserAgent)
req.Header.Set("Connection", "close")
//req.Header.Set("Range", "bytes=0-")
req.Header.Set("Accept", "*/*")
debugRequest(req)
client := &http.Client{}
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return errors.New("Redirect")
}
resp, err := client.Do(req)
if resp != nil && err != nil {
debugResponse(resp)
}
if err != nil {
if resp == nil {
err = errors.New("No response from streaming server")
fmt.Println("Current URL:", currentURL)
ShowError(err, 0)
addErrorToStream(err)
killClientConnection(streamID, playlistID, true)
clientConnection(stream)
return
}
// Redirect
if resp.StatusCode >= 301 && resp.StatusCode <= 308 {
debug = fmt.Sprintf("Streaming Status:HTTP response status [%d] %s", resp.StatusCode, http.StatusText(resp.StatusCode))
showDebug(debug, 2)
defer resp.Body.Close()
goto Redirect
currentURL = strings.Trim(resp.Header.Get("Location"), "\r\n")
stream.Location = currentURL
if len(currentURL) > 0 {
debug = fmt.Sprintf("HTTP Redirect:%s", stream.Location)
showDebug(debug, 2)
defer resp.Body.Close()
goto Redirect
} else {
err = errors.New("Streaming server")
ShowError(err, 4002)
addErrorToStream(err)
defer resp.Body.Close()
return
}
} else {
err = errors.New("Streaming server")
ShowError(err, 4002)
ShowError(err, 0)
addErrorToStream(err)
defer resp.Body.Close()
@@ -685,293 +700,232 @@ InitBuffer:
}
} else {
ShowError(err, 0)
addErrorToStream(err)
defer resp.Body.Close()
return
}
defer resp.Body.Close()
}
// HTTP Status überprüfen, bei Fehlern wird der Stream beendet
var contentType = resp.Header.Get("Content-Type")
var httpStatusCode = resp.StatusCode
var httpStatusInfo = fmt.Sprintf("HTTP Response Status [%d] %s", httpStatusCode, http.StatusText(resp.StatusCode))
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
// HTTP Status überprüfen, bei Fehlern wird der Stream beendet
var contentType = resp.Header.Get("Content-Type")
var httpStatusCode = resp.StatusCode
var httpStatusInfo = fmt.Sprintf("HTTP Response Status [%d] %s", httpStatusCode, http.StatusText(resp.StatusCode))
showInfo("Content Type:" + contentType)
showInfo("Streaming Status:" + httpStatusInfo)
showInfo("Error with this URL:" + currentURL)
if resp.StatusCode != http.StatusOK {
var err = errors.New(http.StatusText(resp.StatusCode))
ShowError(err, 4004)
showInfo("Content Type:" + contentType)
showInfo("Streaming Status:" + httpStatusInfo)
showInfo("Error with this URL:" + currentURL)
debug = fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner)
showDebug(debug, 1)
var err = errors.New(http.StatusText(resp.StatusCode))
ShowError(err, 4004)
debug = fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner)
showDebug(debug, 1)
BufferInformation.Store(playlist.PlaylistID, playlist)
addErrorToStream(err)
killClientConnection(streamID, stream.PlaylistID, true)
clientConnection(stream)
resp.Body.Close()
return
}
// Informationen über den Streamingserver auslesen
if stream.Status == false {
if len(stream.URLStreamingServer) == 0 {
u, _ := url.Parse(currentURL)
p, _ := url.Parse(currentURL)
stream.URLScheme = u.Scheme
stream.URLHost = req.Host
stream.URLPath = p.Path
stream.URLFile = path.Base(p.Path)
stream.URLRedirect = fmt.Sprintf("%s://%s%s", stream.URLScheme, stream.URLHost, stream.URLPath)
stream.URLStreamingServer = fmt.Sprintf("%s://%s", stream.URLScheme, stream.URLHost)
}
debug = fmt.Sprintf("Server URL:%s", stream.URLStreamingServer)
showDebug(debug, 1)
debug = fmt.Sprintf("Temp Folder:%s", tmpFolder)
showDebug(debug, 1)
showInfo("Streaming Status:" + "HTTP Response Status [" + strconv.Itoa(resp.StatusCode) + "] " + http.StatusText(resp.StatusCode))
showInfo("Content Type:" + contentType)
} else {
debug = fmt.Sprintf("Content Type:%s", contentType)
showDebug(debug, 2)
}
// Content Type bereinigen
if len(contentType) > 0 {
var ct = strings.SplitN(contentType, ";", 2)
contentType = strings.ToLower(ct[0])
}
switch contentType {
// M3U8 Playlist
case "application/x-mpegurl", "application/vnd.apple.mpegurl", "audio/mpegurl":
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
ShowError(err, 0)
BufferInformation.Store(playlist.PlaylistID, playlist)
addErrorToStream(err)
}
stream.Body = string(body)
stream.HLS = true
stream.M3U8URL = currentURL
err = parseM3U8(&stream)
if err != nil {
ShowError(err, 4050)
addErrorToStream(err)
}
// Video Stream (TS)
case "video/mpeg", "video/mp4", "video/mp2t", "video/m2ts", "application/octet-stream", "binary/octet-stream", "application/mp2t":
var fileSize int
// Größe des Buffers
buffer := make([]byte, 1024*Settings.BufferSize*2)
var tmpFileSize = 1024 * Settings.BufferSize * 1
debug = fmt.Sprintf("Buffer Size:%d KB [SERVER CONNECTION]", len(buffer)/1024)
showDebug(debug, 3)
debug = fmt.Sprintf("Buffer Size:%d KB [CLIENT CONNECTION]", tmpFileSize/1024)
showDebug(debug, 3)
var tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
if clientConnection(stream) == false {
killClientConnection(streamID, playlistID, true)
clientConnection(stream)
resp.Body.Close()
return
}
bufferFile, err := os.Create(tmpFile)
if err != nil {
// Informationen über den Streamingserver auslesen
if stream.Status == false {
addErrorToStream(err)
bufferFile.Close()
resp.Body.Close()
return
if len(stream.URLStreamingServer) == 0 {
u, _ := url.Parse(currentURL)
p, _ := url.Parse(currentURL)
stream.URLScheme = u.Scheme
stream.URLHost = req.Host
stream.URLPath = p.Path
stream.URLFile = path.Base(p.Path)
stream.URLRedirect = fmt.Sprintf("%s://%s%s", stream.URLScheme, stream.URLHost, stream.URLPath)
stream.URLStreamingServer = fmt.Sprintf("%s://%s", stream.URLScheme, stream.URLHost)
}
debug = fmt.Sprintf("Server URL:%s", stream.URLStreamingServer)
showDebug(debug, 1)
debug = fmt.Sprintf("Temp Folder:%s", tmpFolder)
showDebug(debug, 1)
showInfo("Streaming Status:" + "HTTP Response Status [" + strconv.Itoa(resp.StatusCode) + "] " + http.StatusText(resp.StatusCode))
showInfo("Content Type:" + contentType)
} else {
debug = fmt.Sprintf("Content Type:%s", contentType)
showDebug(debug, 2)
}
for {
// Content Type bereinigen
if len(contentType) > 0 {
var ct = strings.SplitN(contentType, ";", 2)
contentType = strings.ToLower(ct[0])
}
if fileSize == 0 {
debug = fmt.Sprintf("Buffer Status:Buffering (%s)", tmpFile)
showDebug(debug, 2)
}
timeOut = 0
// Buffer mit Daten vom Server füllen
n, err := resp.Body.Read(buffer)
if err != nil && err != io.EOF {
switch contentType {
// M3U8 Playlist
case "application/x-mpegurl", "application/vnd.apple.mpegurl", "audio/mpegurl":
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
ShowError(err, 0)
addErrorToStream(err)
resp.Body.Close()
return
}
defer resp.Body.Close()
stream.Body = string(body)
stream.HLS = true
stream.M3U8URL = currentURL
if _, err := bufferFile.Write(buffer[:n]); err != nil {
ShowError(err, 0)
err = parseM3U8(&stream)
if err != nil {
ShowError(err, 4050)
addErrorToStream(err)
resp.Body.Close()
return
}
defer bufferFile.Close()
// Video Stream (TS)
case "video/mpeg", "video/mp4", "video/mp2t", "video/m2ts", "application/octet-stream", "binary/octet-stream", "application/mp2t":
fileSize = fileSize + n
var fileSize int
// Größe des Buffers
buffer := make([]byte, 1024*Settings.BufferSize*2)
var tmpFileSize = 1024 * Settings.BufferSize * 1
debug = fmt.Sprintf("Buffer Size:%d KB [SERVER CONNECTION]", len(buffer)/1024)
showDebug(debug, 3)
debug = fmt.Sprintf("Buffer Size:%d KB [CLIENT CONNECTION]", tmpFileSize/1024)
showDebug(debug, 3)
var tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
if clientConnection(stream) == false {
resp.Body.Close()
bufferFile.Close()
return
}
err = os.RemoveAll(stream.Folder)
if err != nil {
ShowError(err, 4005)
}
bufferFile, err := os.Create(tmpFile)
if err != nil {
addErrorToStream(err)
bufferFile.Close()
resp.Body.Close()
return
}
// Buffer auf die Festplatte speichern
if fileSize >= tmpFileSize || n == 0 {
for {
bandwidth.Stop = time.Now()
bandwidth.Size += fileSize
if fileSize == 0 {
bandwidth.TimeDiff = bandwidth.Stop.Sub(bandwidth.Start).Seconds()
debug = fmt.Sprintf("Buffer Status:Buffering (%s)", tmpFile)
showDebug(debug, 2)
networkBandwidth = int(float64(bandwidth.Size) / bandwidth.TimeDiff * 1000)
stream.NetworkBandwidth = networkBandwidth
bandwidth.NetworkBandwidth = stream.NetworkBandwidth
debug = fmt.Sprintf("Buffer Status:Done (%s)", tmpFile)
showDebug(debug, 2)
bufferFile.Close()
if stream.Status == false {
stream.Status = true
playlist.Streams[streamID] = stream
}
tmpSegment++
timeOut = 0
// Buffer mit Daten vom Server füllen
n, err := resp.Body.Read(buffer)
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
if err != nil && err != io.EOF {
ShowError(err, 0)
addErrorToStream(err)
resp.Body.Close()
return
}
defer resp.Body.Close()
if _, err := bufferFile.Write(buffer[:n]); err != nil {
ShowError(err, 0)
addErrorToStream(err)
resp.Body.Close()
return
}
defer bufferFile.Close()
fileSize = fileSize + n
if clientConnection(stream) == false {
bufferFile.Close()
resp.Body.Close()
bufferFile.Close()
err = os.RemoveAll(stream.Folder)
if err != nil {
ShowError(err, 4005)
}
return
}
bufferFile, err = os.Create(tmpFile)
if err != nil {
addErrorToStream(err)
resp.Body.Close()
return
}
// Buffer auf die Festplatte speichern
if fileSize >= tmpFileSize || n == 0 {
fileSize = 0
bandwidth.Stop = time.Now()
bandwidth.Size += fileSize
bandwidth.TimeDiff = bandwidth.Stop.Sub(bandwidth.Start).Seconds()
networkBandwidth = int(float64(bandwidth.Size) / bandwidth.TimeDiff * 1000)
stream.NetworkBandwidth = networkBandwidth
bandwidth.NetworkBandwidth = stream.NetworkBandwidth
debug = fmt.Sprintf("Buffer Status:Done (%s)", tmpFile)
showDebug(debug, 2)
if n == 0 {
bufferFile.Close()
resp.Body.Close()
break
}
}
stream.Status = true
playlist.Streams[streamID] = stream
BufferInformation.Store(playlistID, playlist)
}
tmpSegment++
//--
tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment)
// Umbekanntes Format
default:
showInfo("Content Type:" + resp.Header.Get("Content-Type"))
err = errors.New("Streaming error")
ShowError(err, 4003)
if clientConnection(stream) == false {
addErrorToStream(err)
resp.Body.Close()
return
}
bufferFile.Close()
resp.Body.Close()
s++
err = os.RemoveAll(stream.Folder)
if err != nil {
ShowError(err, 4005)
}
// Wartezeit für den Download das nächste Segments berechnen
if stream.HLS == true {
return
}
var sleep float64
bufferFile, err = os.Create(tmpFile)
if err != nil {
addErrorToStream(err)
resp.Body.Close()
return
}
if segment.Duration > 0 {
fileSize = 0
stream.TimeEnd = time.Now()
stream.TimeDiff = stream.TimeEnd.Sub(stream.TimeStart).Seconds()
sleep = (segment.Duration - stream.TimeDiff) - (segment.Duration * 0.25)
if sleep < 0 {
sleep = 0
}
debug = fmt.Sprintf("HLS Status:Download time: %f s | Segment duration: %f s | Sleep: %f s Sequence: %d", stream.TimeDiff, segment.Duration, sleep, segment.Sequence)
showDebug(debug, 1)
if sleep > 0 {
for i := 0.0; i < sleep*1000; i = i + 100 {
_ = i
time.Sleep(time.Duration(100) * time.Millisecond)
if _, err := os.Stat(stream.Folder); os.IsNotExist(err) {
if n == 0 {
bufferFile.Close()
resp.Body.Close()
break
}
@@ -979,15 +933,66 @@ InitBuffer:
}
//--
// Umbekanntes Format
default:
showInfo("Content Type:" + resp.Header.Get("Content-Type"))
err = errors.New("Streaming error")
ShowError(err, 4003)
addErrorToStream(err)
resp.Body.Close()
return
}
}
s++
stream.Segment = stream.Segment[1:len(stream.Segment)]
// Wartezeit für den Download das nächste Segments berechnen
if stream.HLS == true {
resp.Body.Close()
var sleep float64
} // Ende for loop
if segment.Duration > 0 {
stream.TimeEnd = time.Now()
stream.TimeDiff = stream.TimeEnd.Sub(stream.TimeStart).Seconds()
sleep = (segment.Duration - stream.TimeDiff) - (segment.Duration * 0.25)
if sleep < 0 {
sleep = 0
}
debug = fmt.Sprintf("HLS Status:Download time: %f s | Segment duration: %f s | Sleep: %f s Sequence: %d", stream.TimeDiff, segment.Duration, sleep, segment.Sequence)
showDebug(debug, 1)
if sleep > 0 {
for i := 0.0; i < sleep*1000; i = i + 100 {
_ = i
time.Sleep(time.Duration(100) * time.Millisecond)
if _, err := os.Stat(stream.Folder); os.IsNotExist(err) {
break
}
}
}
}
}
stream.Segment = stream.Segment[1:len(stream.Segment)]
resp.Body.Close()
} // Ende for loop
} // Ende BufferInformation
}

View File

@@ -620,6 +620,7 @@ func saveWizard(request RequestStruct) (nextStep int, err error) {
}
buildXEPG(false)
System.ScanInProgress = 0
}