张延森 2 лет назад
Родитель
Сommit
8bb164a75d

Двоичные данные
db/ybai.db Просмотреть файл


+ 5
- 0
pom.xml Просмотреть файл

@@ -26,6 +26,11 @@
26 26
 			<artifactId>fastjson</artifactId>
27 27
 			<version>1.2.4</version>
28 28
 		</dependency>
29
+		<dependency>
30
+			<groupId>io.netty</groupId>
31
+			<artifactId>netty-all</artifactId>
32
+			<version>4.1.55.Final</version>
33
+		</dependency>
29 34
 		<!--mybaits-->
30 35
 		<dependency>
31 36
 			<groupId>org.mybatis.spring.boot</groupId>

+ 7
- 4
src/main/java/com/yunzhi/inte/InteApplication.java Просмотреть файл

@@ -1,15 +1,18 @@
1 1
 package com.yunzhi.inte;
2 2
 
3
-import org.mybatis.spring.annotation.MapperScan;
3
+import com.yunzhi.inte.rtsp.MonitorServer;
4 4
 import org.springframework.boot.SpringApplication;
5
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
6 5
 import org.springframework.boot.autoconfigure.SpringBootApplication;
6
+import org.springframework.context.ConfigurableApplicationContext;
7 7
 
8 8
 @SpringBootApplication
9 9
 public class InteApplication {
10
+	public static ConfigurableApplicationContext ac;
10 11
 
11 12
 	public static void main(String[] args) {
12
-		SpringApplication.run(InteApplication.class, args);
13
-	}
13
+		ac = SpringApplication.run(InteApplication.class, args);
14
+		System.out.println("------------");
14 15
 
16
+//		ac.getBean(MonitorServer.class).start();
17
+	}
15 18
 }

+ 5
- 3
src/main/java/com/yunzhi/inte/controller/MonitorController.java Просмотреть файл

@@ -30,11 +30,13 @@ public class MonitorController {
30 30
      */
31 31
     @RequestMapping(value="/monitor",method= RequestMethod.GET)
32 32
     public ResponseBean MonitorPage(@RequestParam(value ="pageNum",defaultValue = "1") Integer pageNum,
33
-                                   @RequestParam(value ="pageSize",defaultValue = "10") Integer pageSize) throws Exception{
33
+                                   @RequestParam(value ="pageSize",defaultValue = "10") Integer pageSize,
34
+                                    @RequestParam(value ="status", required = false) String status) throws Exception{
34 35
 
35 36
         IPage<Monitor> pg = new Page<>(pageNum, pageSize);
36
-//        QueryWrapper<Info> queryWrapper = new QueryWrapper<>();
37
-//        queryWrapper.orderByDesc("qz");
37
+        QueryWrapper<Monitor> queryWrapper = new QueryWrapper<>();
38
+        queryWrapper.eq(null != status && !"".equals(status), "status", status);
39
+        queryWrapper.orderByDesc("id");
38 40
 
39 41
         IPage<Monitor> result = monitorService.page(pg);
40 42
         return ResponseBean.success(result);

+ 72
- 0
src/main/java/com/yunzhi/inte/rtsp/ChannelGroupHolder.java Просмотреть файл

@@ -0,0 +1,72 @@
1
+package com.yunzhi.inte.rtsp;
2
+
3
+import io.netty.channel.Channel;
4
+import io.netty.channel.ChannelId;
5
+import io.netty.channel.group.ChannelGroup;
6
+import io.netty.channel.group.DefaultChannelGroup;
7
+import io.netty.util.concurrent.ImmediateEventExecutor;
8
+import lombok.extern.slf4j.Slf4j;
9
+
10
+import java.util.HashMap;
11
+import java.util.Map;
12
+
13
+/**
14
+ * 保存群组
15
+ */
16
+@Slf4j
17
+public class ChannelGroupHolder {
18
+
19
+    private static Map<String, ChannelGroup> channelGroups = new HashMap<>();
20
+    private static Map<String, String> channelTagMapping = new HashMap<>();
21
+
22
+    public static Map<String, ChannelGroup> getChannelGroups() {
23
+        return channelGroups;
24
+    }
25
+
26
+    /**
27
+     * 添加channel到群组
28
+     */
29
+    public static synchronized void add(String tag, Channel channel) {
30
+        ChannelGroup group = channelGroups.get(tag);
31
+        if (group == null) {
32
+            group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
33
+            log.info("创建新群组:{}", tag);
34
+            channelGroups.put(tag, group);
35
+        }
36
+        group.add(channel);
37
+        ChannelId channelId = channel.id();
38
+        channelTagMapping.put(channelId.toString(), tag);
39
+        log.info("群组{}加入新channel:{}", tag, channelId);
40
+    }
41
+
42
+    /**
43
+     * 根据IP获取群组
44
+     */
45
+    public static ChannelGroup get(String tag) {
46
+        return channelGroups.get(tag);
47
+    }
48
+
49
+    /**
50
+     * 移除指定channel,如果群组变为空,返回对应IP
51
+     */
52
+    public static String remove(Channel channel) {
53
+        String channelId = channel.id().toString();
54
+        String tag = channelTagMapping.get(channelId);
55
+        channelTagMapping.remove(channelId);
56
+        if (tag != null && !channelTagMapping.containsValue(tag)) {
57
+            log.info("移除channel:{},所在群组:{}", channelId, tag);
58
+            return tag;
59
+        }
60
+        return null;
61
+    }
62
+
63
+    /**
64
+     * 关闭所有群组
65
+     */
66
+    public static void close() {
67
+        for (ChannelGroup group : channelGroups.values()) {
68
+            group.close();
69
+        }
70
+    }
71
+
72
+}

+ 26
- 0
src/main/java/com/yunzhi/inte/rtsp/FFmpeg.java Просмотреть файл

@@ -0,0 +1,26 @@
1
+package com.yunzhi.inte.rtsp;
2
+
3
+import lombok.extern.slf4j.Slf4j;
4
+import org.springframework.beans.factory.annotation.Value;
5
+import org.springframework.stereotype.Component;
6
+
7
+import java.io.IOException;
8
+
9
+@Slf4j
10
+@Component
11
+public class FFmpeg {
12
+    @Value("${my.minitor.port}")
13
+    Integer port;
14
+
15
+    @Value("${my.minitor.ffmpeg}")
16
+    String ffmpegBin;
17
+
18
+    public Process run(String source) throws IOException {
19
+        String command = ffmpegBin + " -f rtsp -rtsp_transport tcp -i " + source
20
+                + " -strict -2 -c:v libx264 -vsync 2 -c:a aac "
21
+                + "-f mpegts -codec:v mpeg1video -s 640x480 -b:v 1000k -bf 0 http://127.0.0.1:" + port
22
+                + "/push/" + source;
23
+        log.info(command);
24
+        return Runtime.getRuntime().exec(command);
25
+    }
26
+}

+ 107
- 0
src/main/java/com/yunzhi/inte/rtsp/HttpRequestHandler.java Просмотреть файл

@@ -0,0 +1,107 @@
1
+package com.yunzhi.inte.rtsp;
2
+
3
+import io.netty.buffer.ByteBuf;
4
+import io.netty.buffer.Unpooled;
5
+import io.netty.channel.ChannelFuture;
6
+import io.netty.channel.ChannelFutureListener;
7
+import io.netty.channel.ChannelHandlerContext;
8
+import io.netty.channel.SimpleChannelInboundHandler;
9
+import io.netty.channel.group.ChannelGroup;
10
+import io.netty.channel.group.ChannelGroupFuture;
11
+import io.netty.handler.codec.http.*;
12
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
13
+import lombok.extern.slf4j.Slf4j;
14
+
15
+import java.io.File;
16
+import java.io.IOException;
17
+import java.io.RandomAccessFile;
18
+
19
+/**
20
+ * 处理HTTP请求
21
+ */
22
+@Slf4j
23
+@SuppressWarnings("deprecation")
24
+public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject> {
25
+    private final String wsUri;
26
+    private String monitorIp;
27
+
28
+    public HttpRequestHandler(String wsUri) {
29
+        this.wsUri = wsUri;
30
+    }
31
+
32
+    @Override
33
+    public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
34
+        if (msg instanceof HttpRequest) {
35
+            HttpRequest request = (HttpRequest) msg;
36
+            String uri = request.getUri();
37
+            log.info("收到HTTP请求:{}", uri);
38
+
39
+            // 处理WebSocket请求
40
+            if (uri.contains(wsUri)) {
41
+                String ip = uri.split(wsUri + "/")[1];
42
+                ChannelGroupHolder.add(ip, ctx.channel());
43
+                TransferRtspHelper.open(ip);
44
+                ctx.fireChannelRead(new DefaultFullHttpRequest(request.protocolVersion(), request.method(), wsUri,
45
+                        Unpooled.buffer(), request.headers(), request.headers()));
46
+            } else if (uri.contains("/push/")) {
47
+                monitorIp = uri.split("/push/")[1];
48
+                log.info("开始接收转码推流:{}", monitorIp);
49
+            }
50
+            // 处理HTTP请求
51
+            else {
52
+                FullHttpResponse response = null;
53
+                response = generateResponse(request, null);
54
+                ctx.write(response);
55
+                ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
56
+                if (!HttpHeaders.isKeepAlive(request)) {
57
+                    future.addListener(ChannelFutureListener.CLOSE);
58
+                }
59
+            }
60
+        } else if (msg instanceof HttpContent && !(msg instanceof LastHttpContent)) {
61
+            HttpContent chunk = (HttpContent) msg;
62
+            ChannelGroup group = ChannelGroupHolder.get(monitorIp);
63
+            ChannelGroupFuture groupFuture = group.writeAndFlush(new BinaryWebSocketFrame(chunk.retain().content()));
64
+            groupFuture.sync();
65
+        }
66
+    }
67
+
68
+    @Override
69
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
70
+        cause.printStackTrace();
71
+        ctx.close();
72
+    }
73
+
74
+    /**
75
+     * 生成HTTP响应
76
+     */
77
+    private FullHttpResponse generateResponse(HttpRequest request, RandomAccessFile file) throws IOException {
78
+        ByteBuf buf = generateByteBuf(file);
79
+        FullHttpResponse response = new DefaultFullHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK,
80
+                buf);
81
+        if (HttpHeaders.isKeepAlive(request)) {
82
+            response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
83
+        }
84
+        response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, buf.writerIndex());
85
+        return response;
86
+    }
87
+
88
+    /**
89
+     * 生成HTTP响应内容
90
+     */
91
+    private ByteBuf generateByteBuf(RandomAccessFile file) throws IOException {
92
+        ByteBuf buf = Unpooled.buffer(0);
93
+        if (file != null) {
94
+            int length = (int) file.length();
95
+            byte[] fileByte = new byte[length];
96
+            file.read(fileByte);
97
+            file.close();
98
+
99
+            buf = Unpooled.buffer(length);
100
+            for (int i = 0; i < length; i++) {
101
+                buf.writeByte(fileByte[i]);
102
+            }
103
+        }
104
+        return buf;
105
+    }
106
+
107
+}

+ 54
- 0
src/main/java/com/yunzhi/inte/rtsp/MonitorServer.java Просмотреть файл

@@ -0,0 +1,54 @@
1
+package com.yunzhi.inte.rtsp;
2
+
3
+import io.netty.bootstrap.ServerBootstrap;
4
+import io.netty.channel.Channel;
5
+import io.netty.channel.EventLoopGroup;
6
+import io.netty.channel.nio.NioEventLoopGroup;
7
+import io.netty.channel.socket.nio.NioServerSocketChannel;
8
+import lombok.extern.slf4j.Slf4j;
9
+import org.springframework.beans.factory.annotation.Autowired;
10
+import org.springframework.beans.factory.annotation.Value;
11
+import org.springframework.stereotype.Component;
12
+
13
+import javax.annotation.PostConstruct;
14
+
15
+/**
16
+ * 引导服务器
17
+ */
18
+@Slf4j
19
+@Component
20
+public class MonitorServer extends Thread {
21
+    @Value("${my.minitor.port}")
22
+    Integer port;
23
+
24
+    @Autowired
25
+    FFmpeg fFmpeg;
26
+
27
+    public void run() {
28
+        TransferRtspHelper.TransferTask.fFmpeg = fFmpeg;
29
+        EventLoopGroup group = new NioEventLoopGroup();
30
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
31
+
32
+        ServerBootstrap bootstrap = new ServerBootstrap();
33
+        bootstrap.group(group, workerGroup).channel(NioServerSocketChannel.class)
34
+                .childHandler(new MonitorServerInitializer());
35
+
36
+        try {
37
+            Channel channel = bootstrap.bind(port).sync().channel();
38
+
39
+            TransferRtspHelper.schedule();
40
+            log.info("服务器开始运行......[:{}]", port);
41
+
42
+            channel.closeFuture().sync();
43
+        } catch (InterruptedException e) {
44
+            e.printStackTrace();
45
+        }
46
+
47
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
48
+            group.shutdownGracefully();
49
+            workerGroup.shutdownGracefully();
50
+            ChannelGroupHolder.close();
51
+            log.info("服务器已关闭");
52
+        }));
53
+    }
54
+}

+ 24
- 0
src/main/java/com/yunzhi/inte/rtsp/MonitorServerInitializer.java Просмотреть файл

@@ -0,0 +1,24 @@
1
+package com.yunzhi.inte.rtsp;
2
+
3
+import io.netty.channel.Channel;
4
+import io.netty.channel.ChannelInitializer;
5
+import io.netty.channel.ChannelPipeline;
6
+import io.netty.handler.codec.http.HttpContentCompressor;
7
+import io.netty.handler.codec.http.HttpServerCodec;
8
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
9
+
10
+/**
11
+ * 初始化ChannelPipeline
12
+ */
13
+public class MonitorServerInitializer extends ChannelInitializer<Channel> {
14
+
15
+    @Override
16
+    public void initChannel(Channel ch) {
17
+        ChannelPipeline pipeline = ch.pipeline();
18
+        pipeline.addLast(new HttpServerCodec());
19
+        pipeline.addLast(new HttpContentCompressor());
20
+        pipeline.addLast(new HttpRequestHandler("/ws"));
21
+        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
22
+        pipeline.addLast(new WebSocketHandler());
23
+    }
24
+}

+ 147
- 0
src/main/java/com/yunzhi/inte/rtsp/TransferRtspHelper.java Просмотреть файл

@@ -0,0 +1,147 @@
1
+package com.yunzhi.inte.rtsp;
2
+
3
+import com.yunzhi.inte.InteApplication;
4
+import io.netty.channel.group.ChannelGroup;
5
+import lombok.AllArgsConstructor;
6
+import lombok.Data;
7
+import lombok.NoArgsConstructor;
8
+import lombok.SneakyThrows;
9
+import lombok.extern.slf4j.Slf4j;
10
+
11
+import java.io.IOException;
12
+import java.util.*;
13
+import java.util.concurrent.ExecutorService;
14
+import java.util.concurrent.Executors;
15
+import java.util.concurrent.ScheduledExecutorService;
16
+import java.util.concurrent.TimeUnit;
17
+
18
+/**
19
+ * 转码
20
+ */
21
+@Slf4j
22
+public class TransferRtspHelper {
23
+
24
+    private static final ScheduledExecutorService SCHEDULE = Executors.newScheduledThreadPool(2);
25
+    private static final ExecutorService PROCESS_EXECUTOR = Executors.newCachedThreadPool();
26
+    private static Map<String, Transfer> transferMap = new HashMap<>();
27
+
28
+    /**
29
+     * 开启
30
+     */
31
+    public static synchronized void open(String ip) throws IOException {
32
+        Transfer transfer = transferMap.get(ip);
33
+        if (transfer == null) {
34
+            transfer = new Transfer();
35
+            transfer.setIp(ip);
36
+            transfer.setStartTime(new Date());
37
+            transfer.setContinueTime(new Date());
38
+            transfer.setCount(1);
39
+            PROCESS_EXECUTOR.submit(new TransferTask(transfer));
40
+            transferMap.put(ip, transfer);
41
+        } else {
42
+            transfer.setContinueTime(new Date());
43
+            transfer.incCount();
44
+        }
45
+        log.info("开始转码:{}", transfer);
46
+    }
47
+
48
+    /**
49
+     * 关闭
50
+     */
51
+    public static synchronized void close(String ip) {
52
+        Transfer transfer = transferMap.get(ip);
53
+        if (transfer != null) {
54
+            transfer.decCount();
55
+            if (transfer.getCount() <= 0) {
56
+                Process process = transfer.getProcess();
57
+                if (process != null && process.isAlive()) {
58
+                    process.destroy();
59
+                }
60
+                transferMap.remove(ip);
61
+            }
62
+        }
63
+        log.info("结束转码:{}", transfer);
64
+    }
65
+
66
+    /**
67
+     * 若当前有在转码,打印相关信息
68
+     */
69
+    public static void report() {
70
+        Map<String, ChannelGroup> channelGroups = ChannelGroupHolder.getChannelGroups();
71
+        transferMap.forEach((k, v) -> {
72
+            ChannelGroup group = channelGroups.get(k);
73
+            if (group != null && !group.isEmpty()) {
74
+                log.info("当前转码中:{}", v);
75
+            }
76
+        });
77
+    }
78
+
79
+    /**
80
+     * 如果没有对应channel,停止转码,如果前端页面还开着,会自动建立回连接
81
+     */
82
+    public static void clear() {
83
+        List<String> toClearIp = new ArrayList<>();
84
+        Map<String, ChannelGroup> channelGroups = ChannelGroupHolder.getChannelGroups();
85
+        transferMap.forEach((k, v) -> {
86
+            ChannelGroup group = channelGroups.get(k);
87
+            if (group == null || group.isEmpty()
88
+                    || System.currentTimeMillis() - v.getContinueTime().getTime() > 1800_000L) {
89
+                toClearIp.add(k);
90
+            }
91
+        });
92
+        if (!toClearIp.isEmpty()) {
93
+            log.info("关闭以下无效转码:{}", toClearIp);
94
+            toClearIp.forEach(TransferRtspHelper::close);
95
+        }
96
+    }
97
+
98
+    /**
99
+     * 开启定时器
100
+     */
101
+    public static void schedule() {
102
+        SCHEDULE.scheduleAtFixedRate(TransferRtspHelper::report, 1L, 5L, TimeUnit.MINUTES);
103
+        SCHEDULE.scheduleAtFixedRate(TransferRtspHelper::clear, 60L, 5L, TimeUnit.SECONDS);
104
+    }
105
+
106
+    @Data
107
+    @NoArgsConstructor
108
+    @AllArgsConstructor
109
+    private static class Transfer {
110
+        private String ip;
111
+        private Process process;
112
+        private int count;
113
+        private Date startTime;
114
+        private Date continueTime;
115
+
116
+        public void incCount() {
117
+            count++;
118
+        }
119
+
120
+        public void decCount() {
121
+            count--;
122
+        }
123
+    }
124
+
125
+    @Data
126
+    @NoArgsConstructor
127
+    @AllArgsConstructor
128
+    public static class TransferTask implements Runnable {
129
+
130
+        private Transfer transfer;
131
+
132
+        public static FFmpeg fFmpeg;
133
+
134
+        @SneakyThrows
135
+        @Override
136
+        public void run() {
137
+            try {
138
+                String ip = transfer.getIp();
139
+                Process process = fFmpeg.run(ip);
140
+                transfer.setProcess(process);
141
+            } catch (Exception e) {
142
+                e.printStackTrace();
143
+            }
144
+        }
145
+    }
146
+
147
+}

+ 40
- 0
src/main/java/com/yunzhi/inte/rtsp/WebSocketHandler.java Просмотреть файл

@@ -0,0 +1,40 @@
1
+package com.yunzhi.inte.rtsp;
2
+
3
+import io.netty.channel.Channel;
4
+import io.netty.channel.ChannelHandlerContext;
5
+import io.netty.channel.SimpleChannelInboundHandler;
6
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
7
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
8
+import lombok.extern.slf4j.Slf4j;
9
+
10
+/**
11
+ * 处理文本帧
12
+ */
13
+@Slf4j
14
+public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
15
+
16
+    @Override
17
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
18
+        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
19
+            log.info("WebSocket连接成功:{}", ctx.channel().id());
20
+        } else {
21
+            super.userEventTriggered(ctx, evt);
22
+        }
23
+    }
24
+
25
+    @Override
26
+    public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
27
+    }
28
+
29
+    @Override
30
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
31
+        Channel channel = ctx.channel();
32
+        log.info("websocket连接断开:{}", channel.id());
33
+        String toDeleteTag = ChannelGroupHolder.remove(channel);
34
+        if (toDeleteTag != null) {
35
+            TransferRtspHelper.close(toDeleteTag);
36
+        }
37
+        ctx.fireChannelInactive();
38
+    }
39
+
40
+}

+ 3
- 0
src/main/resources/application.yml Просмотреть файл

@@ -34,6 +34,9 @@ my:
34 34
   server-base: http://192.168.89.147:8087/api
35 35
   upload-path: E:\work\jgz\upload
36 36
   static-prefix: /assets
37
+  minitor:
38
+    port: 7777
39
+    ffmpeg: D:\applications\ffmpeg-5.1-essentials_build\bin\ffmpeg.exe
37 40
 
38 41
 ###
39 42
 mybatis-plus: