diff --git a/src/buffer.go b/src/buffer.go index aeb23ff..1832649 100644 --- a/src/buffer.go +++ b/src/buffer.go @@ -205,7 +205,7 @@ func bufferingStream(playlistID, streamingURL, channelName string, w http.Respon playlist.Streams[streamID] = stream BufferInformation.Store(playlistID, playlist) - go connectToStreamingServer(streamID, playlist) + go connectToStreamingServer(streamID, playlistID) showInfo(fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner)) @@ -219,163 +219,169 @@ func bufferingStream(playlistID, streamingURL, channelName string, w http.Respon for { // Loop 1: Warten bis das erste Segment durch den Buffer heruntergeladen wurde - if stream, ok := playlist.Streams[streamID]; ok { + if p, ok := BufferInformation.Load(playlistID); ok { - if stream.Status == false { + var playlist = p.(Playlist) - timeOut++ + if stream, ok := playlist.Streams[streamID]; ok { - time.Sleep(time.Duration(100) * time.Millisecond) + if stream.Status == false { - if c, ok := BufferClients.Load(playlistID + stream.MD5); ok { + timeOut++ - var clients = c.(ClientConnection) + time.Sleep(time.Duration(100) * time.Millisecond) - if clients.Error != nil || timeOut > 200 { - killClientConnection(streamID, stream.PlaylistID, false) - return - } + if c, ok := BufferClients.Load(playlistID + stream.MD5); ok { - } - - continue - } - - var oldSegments []string - - for { // Loop 2: Temporäre Datein sind vorhanden, Daten können zum Client gesendet werden - // HTTP Clientverbindung überwachen - cn, ok := w.(http.CloseNotifier) - if ok { - - select { - - case <-cn.CloseNotify(): - killClientConnection(streamID, playlistID, false) - return - - default: - if c, ok := BufferClients.Load(playlistID + stream.MD5); ok { - - var clients = c.(ClientConnection) - if clients.Error != nil { - ShowError(clients.Error, 0) - killClientConnection(streamID, playlistID, false) - return - } - - } else { + var clients = c.(ClientConnection) + if clients.Error != nil || timeOut > 200 { + killClientConnection(streamID, stream.PlaylistID, false) return - } } + continue } - if _, err := os.Stat(stream.Folder); os.IsNotExist(err) { - killClientConnection(streamID, playlistID, false) - return - } + var oldSegments []string - var tmpFiles = getTmpFiles(&stream) - //fmt.Println("Buffer Loop:", stream.Connection) + for { // Loop 2: Temporäre Datein sind vorhanden, Daten können zum Client gesendet werden + // HTTP Clientverbindung überwachen + cn, ok := w.(http.CloseNotifier) + if ok { - for _, f := range tmpFiles { + select { + + case <-cn.CloseNotify(): + killClientConnection(streamID, playlistID, false) + return + + default: + if c, ok := BufferClients.Load(playlistID + stream.MD5); ok { + + var clients = c.(ClientConnection) + if clients.Error != nil { + ShowError(clients.Error, 0) + killClientConnection(streamID, playlistID, false) + return + } + + } else { + + return + + } + + } + + } if _, err := os.Stat(stream.Folder); os.IsNotExist(err) { killClientConnection(streamID, playlistID, false) return } - oldSegments = append(oldSegments, f) + var tmpFiles = getTmpFiles(&stream) + //fmt.Println("Buffer Loop:", stream.Connection) - var fileName = stream.Folder + f + for _, f := range tmpFiles { - file, err := os.Open(fileName) - defer file.Close() + if _, err := os.Stat(stream.Folder); os.IsNotExist(err) { + killClientConnection(streamID, playlistID, false) + return + } - if err == nil { + oldSegments = append(oldSegments, f) + + var fileName = stream.Folder + f + + file, err := os.Open(fileName) + defer file.Close() - l, err := file.Stat() if err == nil { - debug = fmt.Sprintf("Buffer Status:Send to client (%s)", fileName) - showDebug(debug, 2) - - var buffer = make([]byte, int(l.Size())) - _, err = file.Read(buffer) - + l, err := file.Stat() if err == nil { - file.Seek(0, 0) + debug = fmt.Sprintf("Buffer Status:Send to client (%s)", fileName) + showDebug(debug, 2) - if streaming == false { + var buffer = make([]byte, int(l.Size())) + _, err = file.Read(buffer) - contentType := http.DetectContentType(buffer) - _ = contentType - //w.Header().Set("Content-type", "video/mpeg") - w.Header().Set("Content-type", contentType) - w.Header().Set("Content-Length", "0") - w.Header().Set("Connection", "close") + if err == nil { - } + file.Seek(0, 0) - /* - // HDHR Header - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Pragma", "no-cache") - w.Header().Set("transferMode.dlna.org", "Streaming") - */ + if streaming == false { - _, err := w.Write(buffer) + contentType := http.DetectContentType(buffer) + _ = contentType + //w.Header().Set("Content-type", "video/mpeg") + w.Header().Set("Content-type", contentType) + w.Header().Set("Content-Length", "0") + w.Header().Set("Connection", "close") + + } + + /* + // HDHR Header + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Pragma", "no-cache") + w.Header().Set("transferMode.dlna.org", "Streaming") + */ + + _, err := w.Write(buffer) + + if err != nil { + file.Close() + killClientConnection(streamID, playlistID, false) + return + } - if err != nil { file.Close() - killClientConnection(streamID, playlistID, false) - return + streaming = true + } file.Close() - streaming = true } - file.Close() + var n = indexOfString(f, oldSegments) + + if n > 20 { + + var fileToRemove = stream.Folder + oldSegments[0] + os.RemoveAll(getPlatformFile(fileToRemove)) + oldSegments = append(oldSegments[:0], oldSegments[0+1:]...) + + } } - var n = indexOfString(f, oldSegments) - - if n > 20 { - - var fileToRemove = stream.Folder + oldSegments[0] - os.RemoveAll(getPlatformFile(fileToRemove)) - oldSegments = append(oldSegments[:0], oldSegments[0+1:]...) - - } + file.Close() } - file.Close() + if len(tmpFiles) == 0 { + time.Sleep(time.Duration(100) * time.Millisecond) + } - } + } // Ende Loop 2 - if len(tmpFiles) == 0 { - time.Sleep(time.Duration(100) * time.Millisecond) - } + } else { - } // Ende Loop 2 + // Stream nicht vorhanden + killClientConnection(streamID, stream.PlaylistID, false) + showInfo(fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner)) + return - } else { + } - // Stream nicht vorhanden - killClientConnection(streamID, stream.PlaylistID, false) - showInfo(fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner)) - return - - } + } // Ende BufferInformation } // Ende Loop 1 @@ -507,176 +513,191 @@ func clientConnection(stream ThisStream) (status bool) { return } -func connectToStreamingServer(streamID int, playlist Playlist) { +func connectToStreamingServer(streamID int, playlistID string) { - var timeOut = 0 - var debug string - var tmpSegment = 1 - var tmpFolder = playlist.Streams[streamID].Folder - var m3u8Segments []string - var bandwidth BandwidthCalculation - var networkBandwidth = Settings.M3U8AdaptiveBandwidthMBPS * 1e+6 + if p, ok := BufferInformation.Load(playlistID); ok { - var defaultSegment = func() { + var playlist = p.(Playlist) - var segment Segment + var timeOut = 0 + var debug string + var tmpSegment = 1 + var tmpFolder = playlist.Streams[streamID].Folder + var m3u8Segments []string + var bandwidth BandwidthCalculation + var networkBandwidth = Settings.M3U8AdaptiveBandwidthMBPS * 1e+6 - if len(playlist.Streams[streamID].Location) > 0 { - segment.URL = playlist.Streams[streamID].Location - } else { - segment.URL = playlist.Streams[streamID].URL - } + var defaultSegment = func() { - segment.Duration = 0 + var segment Segment - var stream = playlist.Streams[streamID] - stream.Segment = []Segment{} - stream.Segment = append(stream.Segment, segment) + if len(playlist.Streams[streamID].Location) > 0 { + segment.URL = playlist.Streams[streamID].Location + } else { + segment.URL = playlist.Streams[streamID].URL + } - stream.HLS = false - stream.Sequence = 0 - stream.Wait = 0 - stream.NetworkBandwidth = networkBandwidth + segment.Duration = 0 - playlist.Streams[streamID] = stream + var stream = playlist.Streams[streamID] + stream.Segment = []Segment{} + stream.Segment = append(stream.Segment, segment) - timeOut++ + stream.HLS = false + stream.Sequence = 0 + stream.Wait = 0 + stream.NetworkBandwidth = networkBandwidth - } + playlist.Streams[streamID] = stream - var addErrorToStream = func(err error) { - - var stream = playlist.Streams[streamID] - - if c, ok := BufferClients.Load(stream.PlaylistID + stream.MD5); ok { - - var clients = c.(ClientConnection) - clients.Error = err - BufferClients.Store(stream.PlaylistID+stream.MD5, clients) + timeOut++ } - } + var addErrorToStream = func(err error) { - os.RemoveAll(getPlatformPath(tmpFolder)) + var stream = playlist.Streams[streamID] - err := checkFolder(tmpFolder) - if err != nil { - ShowError(err, 0) - addErrorToStream(err) - return - } + if c, ok := BufferClients.Load(playlistID + stream.MD5); ok { - // M3U8 Segmente -InitBuffer: - defaultSegment() + var clients = c.(ClientConnection) + clients.Error = err + BufferClients.Store(playlistID+stream.MD5, clients) - if len(m3u8Segments) > 30 { - m3u8Segments = m3u8Segments[15:] - } - if timeOut >= 10 { - return - } + } - var stream ThisStream = playlist.Streams[streamID] - - if stream.Status == false { - - if strings.Index(stream.URL, ".m3u8") != -1 { - showInfo("Streaming Type:" + "[HLS / M3U8]") - } else { - showInfo("Streaming Type:" + "[TS]") } - showInfo("Streaming URL:" + stream.URL) + os.RemoveAll(getPlatformPath(tmpFolder)) - } - - var s = 0 - - stream.TimeStart = time.Now() - bandwidth.Start = stream.TimeStart - bandwidth.Size = 0 - - for { - - if clientConnection(stream) == false { + err := checkFolder(tmpFolder) + if err != nil { + ShowError(err, 0) + addErrorToStream(err) return } - if len(stream.Segment) == 0 || len(stream.URL) == 0 { - goto InitBuffer + // M3U8 Segmente + InitBuffer: + defaultSegment() + + if len(m3u8Segments) > 30 { + m3u8Segments = m3u8Segments[15:] + } + if timeOut >= 10 { + return } - var segment = stream.Segment[0] + var stream ThisStream = playlist.Streams[streamID] - var currentURL = strings.Trim(segment.URL, "\r\n") + if stream.Status == false { + + if strings.Index(stream.URL, ".m3u8") != -1 { + showInfo("Streaming Type:" + "[HLS / M3U8]") + } else { + showInfo("Streaming Type:" + "[TS]") + } + + showInfo("Streaming URL:" + stream.URL) - if len(currentURL) == 0 { - goto InitBuffer } - debug = fmt.Sprintf("Connection to:%s", currentURL) - showDebug(debug, 2) + var s = 0 - // Sprung für Redirect (301 <---> 308) - Redirect: + stream.TimeStart = time.Now() + bandwidth.Start = stream.TimeStart + bandwidth.Size = 0 - req, err := http.NewRequest("GET", currentURL, nil) - req.Header.Set("User-Agent", Settings.UserAgent) - req.Header.Set("Connection", "close") - //req.Header.Set("Range", "bytes=0-") - req.Header.Set("Accept", "*/*") - debugRequest(req) - - client := &http.Client{} - client.CheckRedirect = func(req *http.Request, via []*http.Request) error { - return errors.New("Redirect") - } - - resp, err := client.Do(req) - - if resp != nil && err != nil { - debugResponse(resp) - } - - if err != nil { - - if resp == nil { - - err = errors.New("No response from streaming server") - fmt.Println("Current URL:", currentURL) - ShowError(err, 0) - - addErrorToStream(err) - - killClientConnection(streamID, stream.PlaylistID, true) - clientConnection(stream) + for { + if clientConnection(stream) == false { return } - // Redirect - if resp.StatusCode >= 301 && resp.StatusCode <= 308 { + if len(stream.Segment) == 0 || len(stream.URL) == 0 { + goto InitBuffer + } - debug = fmt.Sprintf("Streaming Status:HTTP response status [%d] %s", resp.StatusCode, http.StatusText(resp.StatusCode)) - showDebug(debug, 2) + var segment = stream.Segment[0] - currentURL = strings.Trim(resp.Header.Get("Location"), "\r\n") + var currentURL = strings.Trim(segment.URL, "\r\n") - stream.Location = currentURL + if len(currentURL) == 0 { + goto InitBuffer + } - if len(currentURL) > 0 { + debug = fmt.Sprintf("Connection to:%s", currentURL) + showDebug(debug, 2) - debug = fmt.Sprintf("HTTP Redirect:%s", stream.Location) + // Sprung für Redirect (301 <---> 308) + Redirect: + + req, err := http.NewRequest("GET", currentURL, nil) + req.Header.Set("User-Agent", Settings.UserAgent) + req.Header.Set("Connection", "close") + //req.Header.Set("Range", "bytes=0-") + req.Header.Set("Accept", "*/*") + debugRequest(req) + + client := &http.Client{} + client.CheckRedirect = func(req *http.Request, via []*http.Request) error { + return errors.New("Redirect") + } + + resp, err := client.Do(req) + + if resp != nil && err != nil { + debugResponse(resp) + } + + if err != nil { + + if resp == nil { + + err = errors.New("No response from streaming server") + fmt.Println("Current URL:", currentURL) + ShowError(err, 0) + + addErrorToStream(err) + + killClientConnection(streamID, playlistID, true) + clientConnection(stream) + + return + } + + // Redirect + if resp.StatusCode >= 301 && resp.StatusCode <= 308 { + + debug = fmt.Sprintf("Streaming Status:HTTP response status [%d] %s", resp.StatusCode, http.StatusText(resp.StatusCode)) showDebug(debug, 2) - defer resp.Body.Close() - goto Redirect + + currentURL = strings.Trim(resp.Header.Get("Location"), "\r\n") + + stream.Location = currentURL + + if len(currentURL) > 0 { + + debug = fmt.Sprintf("HTTP Redirect:%s", stream.Location) + showDebug(debug, 2) + defer resp.Body.Close() + goto Redirect + + } else { + + err = errors.New("Streaming server") + ShowError(err, 4002) + addErrorToStream(err) + + defer resp.Body.Close() + + return + + } } else { - err = errors.New("Streaming server") - ShowError(err, 4002) + ShowError(err, 0) addErrorToStream(err) defer resp.Body.Close() @@ -685,290 +706,232 @@ InitBuffer: } - } else { - - ShowError(err, 0) - addErrorToStream(err) - defer resp.Body.Close() - return - } defer resp.Body.Close() - } + // HTTP Status überprüfen, bei Fehlern wird der Stream beendet + var contentType = resp.Header.Get("Content-Type") + var httpStatusCode = resp.StatusCode + var httpStatusInfo = fmt.Sprintf("HTTP Response Status [%d] %s", httpStatusCode, http.StatusText(resp.StatusCode)) - defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { - // HTTP Status überprüfen, bei Fehlern wird der Stream beendet - var contentType = resp.Header.Get("Content-Type") - var httpStatusCode = resp.StatusCode - var httpStatusInfo = fmt.Sprintf("HTTP Response Status [%d] %s", httpStatusCode, http.StatusText(resp.StatusCode)) + showInfo("Content Type:" + contentType) + showInfo("Streaming Status:" + httpStatusInfo) + showInfo("Error with this URL:" + currentURL) - if resp.StatusCode != http.StatusOK { + var err = errors.New(http.StatusText(resp.StatusCode)) + ShowError(err, 4004) - showInfo("Content Type:" + contentType) - showInfo("Streaming Status:" + httpStatusInfo) - showInfo("Error with this URL:" + currentURL) + debug = fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner) + showDebug(debug, 1) - var err = errors.New(http.StatusText(resp.StatusCode)) - ShowError(err, 4004) - - debug = fmt.Sprintf("Streaming Status:Playlist: %s - Tuner: %d / %d", playlist.PlaylistName, len(playlist.Streams), playlist.Tuner) - showDebug(debug, 1) - - BufferInformation.Store(playlist.PlaylistID, playlist) - addErrorToStream(err) - - killClientConnection(streamID, stream.PlaylistID, true) - clientConnection(stream) - resp.Body.Close() - - return - } - - // Informationen über den Streamingserver auslesen - if stream.Status == false { - - if len(stream.URLStreamingServer) == 0 { - - u, _ := url.Parse(currentURL) - p, _ := url.Parse(currentURL) - - stream.URLScheme = u.Scheme - stream.URLHost = req.Host - stream.URLPath = p.Path - stream.URLFile = path.Base(p.Path) - - stream.URLRedirect = fmt.Sprintf("%s://%s%s", stream.URLScheme, stream.URLHost, stream.URLPath) - stream.URLStreamingServer = fmt.Sprintf("%s://%s", stream.URLScheme, stream.URLHost) - - } - - debug = fmt.Sprintf("Server URL:%s", stream.URLStreamingServer) - showDebug(debug, 1) - - debug = fmt.Sprintf("Temp Folder:%s", tmpFolder) - showDebug(debug, 1) - - showInfo("Streaming Status:" + "HTTP Response Status [" + strconv.Itoa(resp.StatusCode) + "] " + http.StatusText(resp.StatusCode)) - showInfo("Content Type:" + contentType) - - } else { - - debug = fmt.Sprintf("Content Type:%s", contentType) - showDebug(debug, 2) - - } - - // Content Type bereinigen - if len(contentType) > 0 { - var ct = strings.SplitN(contentType, ";", 2) - contentType = strings.ToLower(ct[0]) - } - - switch contentType { - - // M3U8 Playlist - case "application/x-mpegurl", "application/vnd.apple.mpegurl", "audio/mpegurl": - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - ShowError(err, 0) + BufferInformation.Store(playlist.PlaylistID, playlist) addErrorToStream(err) - } - stream.Body = string(body) - stream.HLS = true - stream.M3U8URL = currentURL - - err = parseM3U8(&stream) - if err != nil { - ShowError(err, 4050) - addErrorToStream(err) - } - - // Video Stream (TS) - case "video/mpeg", "video/mp4", "video/mp2t", "video/m2ts", "application/octet-stream", "binary/octet-stream", "application/mp2t": - - var fileSize int - - // Größe des Buffers - buffer := make([]byte, 1024*Settings.BufferSize*2) - var tmpFileSize = 1024 * Settings.BufferSize * 1 - - debug = fmt.Sprintf("Buffer Size:%d KB [SERVER CONNECTION]", len(buffer)/1024) - showDebug(debug, 3) - - debug = fmt.Sprintf("Buffer Size:%d KB [CLIENT CONNECTION]", tmpFileSize/1024) - showDebug(debug, 3) - - var tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment) - - if clientConnection(stream) == false { + killClientConnection(streamID, playlistID, true) + clientConnection(stream) resp.Body.Close() + return } - bufferFile, err := os.Create(tmpFile) - if err != nil { + // Informationen über den Streamingserver auslesen + if stream.Status == false { - addErrorToStream(err) - bufferFile.Close() - resp.Body.Close() - return + if len(stream.URLStreamingServer) == 0 { + + u, _ := url.Parse(currentURL) + p, _ := url.Parse(currentURL) + + stream.URLScheme = u.Scheme + stream.URLHost = req.Host + stream.URLPath = p.Path + stream.URLFile = path.Base(p.Path) + + stream.URLRedirect = fmt.Sprintf("%s://%s%s", stream.URLScheme, stream.URLHost, stream.URLPath) + stream.URLStreamingServer = fmt.Sprintf("%s://%s", stream.URLScheme, stream.URLHost) + + } + + debug = fmt.Sprintf("Server URL:%s", stream.URLStreamingServer) + showDebug(debug, 1) + + debug = fmt.Sprintf("Temp Folder:%s", tmpFolder) + showDebug(debug, 1) + + showInfo("Streaming Status:" + "HTTP Response Status [" + strconv.Itoa(resp.StatusCode) + "] " + http.StatusText(resp.StatusCode)) + showInfo("Content Type:" + contentType) + + } else { + + debug = fmt.Sprintf("Content Type:%s", contentType) + showDebug(debug, 2) } - for { + // Content Type bereinigen + if len(contentType) > 0 { + var ct = strings.SplitN(contentType, ";", 2) + contentType = strings.ToLower(ct[0]) + } - if fileSize == 0 { - - debug = fmt.Sprintf("Buffer Status:Buffering (%s)", tmpFile) - showDebug(debug, 2) - - } - - timeOut = 0 - // Buffer mit Daten vom Server füllen - n, err := resp.Body.Read(buffer) - - if err != nil && err != io.EOF { + switch contentType { + // M3U8 Playlist + case "application/x-mpegurl", "application/vnd.apple.mpegurl", "audio/mpegurl": + body, err := ioutil.ReadAll(resp.Body) + if err != nil { ShowError(err, 0) addErrorToStream(err) - resp.Body.Close() - return - } - defer resp.Body.Close() + stream.Body = string(body) + stream.HLS = true + stream.M3U8URL = currentURL - if _, err := bufferFile.Write(buffer[:n]); err != nil { - - ShowError(err, 0) + err = parseM3U8(&stream) + if err != nil { + ShowError(err, 4050) addErrorToStream(err) - resp.Body.Close() - return - } - defer bufferFile.Close() + // Video Stream (TS) + case "video/mpeg", "video/mp4", "video/mp2t", "video/m2ts", "application/octet-stream", "binary/octet-stream", "application/mp2t": - fileSize = fileSize + n + var fileSize int + + // Größe des Buffers + buffer := make([]byte, 1024*Settings.BufferSize*2) + var tmpFileSize = 1024 * Settings.BufferSize * 1 + + debug = fmt.Sprintf("Buffer Size:%d KB [SERVER CONNECTION]", len(buffer)/1024) + showDebug(debug, 3) + + debug = fmt.Sprintf("Buffer Size:%d KB [CLIENT CONNECTION]", tmpFileSize/1024) + showDebug(debug, 3) + + var tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment) if clientConnection(stream) == false { - resp.Body.Close() - bufferFile.Close() + return + } - err = os.RemoveAll(stream.Folder) - if err != nil { - ShowError(err, 4005) - } + bufferFile, err := os.Create(tmpFile) + if err != nil { + + addErrorToStream(err) + bufferFile.Close() + resp.Body.Close() return } - // Buffer auf die Festplatte speichern - if fileSize >= tmpFileSize || n == 0 { + for { - bandwidth.Stop = time.Now() - bandwidth.Size += fileSize + if fileSize == 0 { - bandwidth.TimeDiff = bandwidth.Stop.Sub(bandwidth.Start).Seconds() + debug = fmt.Sprintf("Buffer Status:Buffering (%s)", tmpFile) + showDebug(debug, 2) - networkBandwidth = int(float64(bandwidth.Size) / bandwidth.TimeDiff * 1000) + } - stream.NetworkBandwidth = networkBandwidth - bandwidth.NetworkBandwidth = stream.NetworkBandwidth + timeOut = 0 + // Buffer mit Daten vom Server füllen + n, err := resp.Body.Read(buffer) - debug = fmt.Sprintf("Buffer Status:Done (%s)", tmpFile) - showDebug(debug, 2) + if err != nil && err != io.EOF { - bufferFile.Close() + ShowError(err, 0) + addErrorToStream(err) + resp.Body.Close() + return - stream.Status = true - playlist.Streams[streamID] = stream - tmpSegment++ + } - tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment) + defer resp.Body.Close() + + if _, err := bufferFile.Write(buffer[:n]); err != nil { + + ShowError(err, 0) + addErrorToStream(err) + resp.Body.Close() + return + + } + + defer bufferFile.Close() + + fileSize = fileSize + n if clientConnection(stream) == false { - bufferFile.Close() resp.Body.Close() + bufferFile.Close() err = os.RemoveAll(stream.Folder) if err != nil { ShowError(err, 4005) } - return + } - bufferFile, err = os.Create(tmpFile) - if err != nil { - addErrorToStream(err) - resp.Body.Close() - return - } + // Buffer auf die Festplatte speichern + if fileSize >= tmpFileSize || n == 0 { - fileSize = 0 + bandwidth.Stop = time.Now() + bandwidth.Size += fileSize + + bandwidth.TimeDiff = bandwidth.Stop.Sub(bandwidth.Start).Seconds() + + networkBandwidth = int(float64(bandwidth.Size) / bandwidth.TimeDiff * 1000) + + stream.NetworkBandwidth = networkBandwidth + bandwidth.NetworkBandwidth = stream.NetworkBandwidth + + debug = fmt.Sprintf("Buffer Status:Done (%s)", tmpFile) + showDebug(debug, 2) - if n == 0 { bufferFile.Close() - resp.Body.Close() - break - } - } + stream.Status = true + playlist.Streams[streamID] = stream + BufferInformation.Store(playlistID, playlist) - } + tmpSegment++ - //-- + tmpFile = fmt.Sprintf("%s%d.ts", tmpFolder, tmpSegment) - // Umbekanntes Format - default: - showInfo("Content Type:" + resp.Header.Get("Content-Type")) - err = errors.New("Streaming error") - ShowError(err, 4003) + if clientConnection(stream) == false { - addErrorToStream(err) - resp.Body.Close() - return - } + bufferFile.Close() + resp.Body.Close() - s++ + err = os.RemoveAll(stream.Folder) + if err != nil { + ShowError(err, 4005) + } - // Wartezeit für den Download das nächste Segments berechnen - if stream.HLS == true { + return + } - var sleep float64 + bufferFile, err = os.Create(tmpFile) + if err != nil { + addErrorToStream(err) + resp.Body.Close() + return + } - if segment.Duration > 0 { + fileSize = 0 - stream.TimeEnd = time.Now() - stream.TimeDiff = stream.TimeEnd.Sub(stream.TimeStart).Seconds() - - sleep = (segment.Duration - stream.TimeDiff) - (segment.Duration * 0.25) - - if sleep < 0 { - sleep = 0 - } - - debug = fmt.Sprintf("HLS Status:Download time: %f s | Segment duration: %f s | Sleep: %f s Sequence: %d", stream.TimeDiff, segment.Duration, sleep, segment.Sequence) - showDebug(debug, 1) - - if sleep > 0 { - - for i := 0.0; i < sleep*1000; i = i + 100 { - - _ = i - time.Sleep(time.Duration(100) * time.Millisecond) - - if _, err := os.Stat(stream.Folder); os.IsNotExist(err) { + if n == 0 { + bufferFile.Close() + resp.Body.Close() break } @@ -976,15 +939,66 @@ InitBuffer: } + //-- + + // Umbekanntes Format + default: + showInfo("Content Type:" + resp.Header.Get("Content-Type")) + err = errors.New("Streaming error") + ShowError(err, 4003) + + addErrorToStream(err) + resp.Body.Close() + return } - } + s++ - stream.Segment = stream.Segment[1:len(stream.Segment)] + // Wartezeit für den Download das nächste Segments berechnen + if stream.HLS == true { - resp.Body.Close() + var sleep float64 - } // Ende for loop + if segment.Duration > 0 { + + stream.TimeEnd = time.Now() + stream.TimeDiff = stream.TimeEnd.Sub(stream.TimeStart).Seconds() + + sleep = (segment.Duration - stream.TimeDiff) - (segment.Duration * 0.25) + + if sleep < 0 { + sleep = 0 + } + + debug = fmt.Sprintf("HLS Status:Download time: %f s | Segment duration: %f s | Sleep: %f s Sequence: %d", stream.TimeDiff, segment.Duration, sleep, segment.Sequence) + showDebug(debug, 1) + + if sleep > 0 { + + for i := 0.0; i < sleep*1000; i = i + 100 { + + _ = i + time.Sleep(time.Duration(100) * time.Millisecond) + + if _, err := os.Stat(stream.Folder); os.IsNotExist(err) { + break + } + + } + + } + + } + + } + + stream.Segment = stream.Segment[1:len(stream.Segment)] + + resp.Body.Close() + + } // Ende for loop + + } // Ende BufferInformation } diff --git a/src/config.go b/src/config.go index 27115d0..23796f1 100644 --- a/src/config.go +++ b/src/config.go @@ -109,7 +109,7 @@ func Init() (err error) { // Überprüfen ob xTeVe als root läuft if os.Geteuid() == 0 { - showWarning(2010) + showWarning(2110) } if System.Flag.Debug > 0 { diff --git a/src/data.go b/src/data.go index 852aa55..11e519d 100644 --- a/src/data.go +++ b/src/data.go @@ -397,17 +397,25 @@ func saveFilter(request RequestStruct) (settings SettingsStrcut, err error) { // Filter aktualisieren / löschen for key, value := range data.(map[string]interface{}) { - var oldData = filterMap[dataID].(map[string]interface{}) - oldData[key] = value - // Filter löschen if _, ok := data.(map[string]interface{})["delete"]; ok { - delete(filterMap, dataID) break + } + + if filter, ok := data.(map[string]interface{})["filter"].(string); ok { + + if len(filter) == 0 { + err = errors.New(getErrMsg(1014)) + delete(filterMap, dataID) + return + } } + var oldData = filterMap[dataID].(map[string]interface{}) + oldData[key] = value + } } @@ -446,8 +454,39 @@ func saveXEpgMapping(request RequestStruct) (err error) { Data.XEPG.Channels = request.EpgMapping - cleanupXEPG() - buildXEPG(true) + if System.ScanInProgress == 0 { + + cleanupXEPG() + buildXEPG(true) + + } else { + + // Wenn während des erstellen der Datanbank das Mapping erneut gespeichert wird, wird die Datenbank erst später erneut aktualisiert. + go func() { + + if System.BackgroundProcess == true { + return + } + + System.BackgroundProcess = true + + for { + time.Sleep(time.Duration(1) * time.Second) + if System.ScanInProgress == 0 { + break + } + + } + + cleanupXEPG() + buildXEPG(false) + + System.BackgroundProcess = false + + }() + + } + return } @@ -612,6 +651,7 @@ func saveWizard(request RequestStruct) (nextStep int, err error) { } buildXEPG(false) + System.ScanInProgress = 0 } diff --git a/src/hdhr.go b/src/hdhr.go index b39e516..830401a 100644 --- a/src/hdhr.go +++ b/src/hdhr.go @@ -95,7 +95,7 @@ func getLineupStatus() (jsonContent []byte, err error) { lineupStatus.ScanInProgress = System.ScanInProgress lineupStatus.ScanPossible = 0 lineupStatus.Source = "Cable" - lineupStatus.SourceList = []string{"IPTV", "Cable"} + lineupStatus.SourceList = []string{"Cable"} jsonContent, err = json.MarshalIndent(lineupStatus, "", " ") diff --git a/src/internal/m3u-parser/xteve_m3uParser.go b/src/internal/m3u-parser/xteve_m3uParser.go index fa191cd..476d05e 100755 --- a/src/internal/m3u-parser/xteve_m3uParser.go +++ b/src/internal/m3u-parser/xteve_m3uParser.go @@ -2,6 +2,8 @@ package m3u import ( "errors" + "fmt" + "log" "net/url" "regexp" "strings" @@ -12,6 +14,7 @@ func MakeInterfaceFromM3U(byteStream []byte) (allChannels []interface{}, err err var content = string(byteStream) var channelName string + var uuids []string var parseMetaData = func(channel string) (stream map[string]string) { @@ -53,7 +56,7 @@ func MakeInterfaceFromM3U(byteStream []byte) (allChannels []interface{}, err err line = strings.Replace(line, p, "", 1) p = strings.Replace(p, `"`, "", -1) - var parameter = strings.Split(p, "=") + var parameter = strings.SplitN(p, "=", 2) if len(parameter) == 2 { @@ -120,9 +123,15 @@ func MakeInterfaceFromM3U(byteStream []byte) (allChannels []interface{}, err err if strings.Contains(strings.ToLower(key), "id") { + if indexOfString(value, uuids) != -1 { + log.Println(fmt.Sprintf("Channel: %s - %s = %s ", stream["name"], key, value)) + break + } + + uuids = append(uuids, value) + stream["_uuid.key"] = key stream["_uuid.value"] = value - //os.Exit(0) break } @@ -160,3 +169,14 @@ func MakeInterfaceFromM3U(byteStream []byte) (allChannels []interface{}, err err return } + +func indexOfString(element string, data []string) int { + + for k, v := range data { + if element == v { + return k + } + } + + return -1 +} diff --git a/src/m3u.go b/src/m3u.go index 84066ea..d513ac0 100644 --- a/src/m3u.go +++ b/src/m3u.go @@ -43,6 +43,10 @@ func filterThisStream(s interface{}) (status bool) { for _, filter := range Data.Filter { + if filter.Rule == "" { + continue + } + var group, name, search string var exclude, include string var match = false diff --git a/src/screen.go b/src/screen.go index 8822d09..29acf2c 100644 --- a/src/screen.go +++ b/src/screen.go @@ -245,6 +245,8 @@ func getErrMsg(errCode int) (errMsg string) { errMsg = fmt.Sprintf("Invalid formatting of the time") case 1013: errMsg = fmt.Sprintf("Invalid settings file (settings.json), file must be at least version %s", System.Compatibility) + case 1014: + errMsg = fmt.Sprintf("Invalid filter rule") case 1020: errMsg = fmt.Sprintf("Data could not be saved, invalid keyword") diff --git a/src/struct-system.go b/src/struct-system.go index d180ff8..b246700 100644 --- a/src/struct-system.go +++ b/src/struct-system.go @@ -11,6 +11,7 @@ type SystemStruct struct { APIVersion string AppName string ARCH string + BackgroundProcess bool Branch string Build string Compatibility string diff --git a/src/xepg.go b/src/xepg.go index 2a19bbb..415439f 100644 --- a/src/xepg.go +++ b/src/xepg.go @@ -306,7 +306,6 @@ func createXEPGDatabase() (err error) { } if len(xepgChannel.XChannelID) == 0 { - fmt.Println(mapToJSON(xepgChannel)) delete(Data.XEPG.Channels, id) } @@ -316,6 +315,12 @@ func createXEPGDatabase() (err error) { } + var xepgChannels = make(map[string]interface{}) + + for k, v := range Data.XEPG.Channels { + xepgChannels[k] = v + } + for _, dsa := range Data.Streams.Active { var channelExists = false // Entscheidet ob ein Kanal neu zu Datenbank hinzugefügt werden soll. @@ -331,7 +336,7 @@ func createXEPGDatabase() (err error) { Data.Cache.Streams.Active = append(Data.Cache.Streams.Active, m3uChannel.Name) // XEPG Datenbank durchlaufen um nach dem Kanal zu suchen. - for xepg, dxc := range Data.XEPG.Channels { + for xepg, dxc := range xepgChannels { var xepgChannel XEPGChannelStruct err = json.Unmarshal([]byte(mapToJSON(dxc)), &xepgChannel) @@ -367,6 +372,7 @@ func createXEPGDatabase() (err error) { //os.Exit(0) switch channelExists { + case true: // Bereits vorhandener Kanal var xepgChannel XEPGChannelStruct diff --git a/xteve.go b/xteve.go index 5ddec50..bc50098 100644 --- a/xteve.go +++ b/xteve.go @@ -39,7 +39,7 @@ var GitHub = GitHubStruct{Branch: "master", User: "xteve-project", Repo: "xTeVe- const Name = "xTeVe" // Version : Version, die Build Nummer wird in der main func geparst. -const Version = "2.0.2.0020" +const Version = "2.0.2.0023" // DBVersion : Datanbank Version const DBVersion = "2.0.0"