Merge branch 'fix/recvData' into 'main'

Feat: Stream data by stream-id

See merge request ayushm99/apl-nuke!1
main v1.1.9
Ayush Mukherjee 4 years ago
commit 3a5defdde4

@ -1,6 +1,6 @@
{ {
"name": "nuke", "name": "nuke",
"version": "1.0.0", "version": "1.1.9",
"description": "APL Esports' Nuke Server", "description": "APL Esports' Nuke Server",
"main": "index.js", "main": "index.js",
"scripts": { "scripts": {

@ -23,6 +23,7 @@ router.post('/', async (req, res) => {
push(id) push(id)
res.json({ res.json({
session: id, session: id,
name: r.data.entries[0].name,
}) })
} else { } else {
res.status(403).json({ res.status(403).json({

@ -4,10 +4,12 @@ const url = require('url')
const app = require('./app') const app = require('./app')
const { get } = require('./uuids') const { get } = require('./uuids')
const handler = require('./ws/handler') const handler = require('./ws/handler')
const { subscribe } = require('./ws/streams')
mongoose.connect(process.env.MONGO_URI, { mongoose.connect(process.env.MONGO_URI, {
useNewUrlParser: true, useNewUrlParser: true,
useUnifiedTopology: true, useUnifiedTopology: true,
useFindAndModify: false,
}) })
const db = mongoose.connection const db = mongoose.connection
@ -15,15 +17,24 @@ db.once('open', () => console.log('Connected to mongodb instance!'))
const port = process.env.PORT || 5000; const port = process.env.PORT || 5000;
wss = new WebSocket.Server({ wss1 = new WebSocket.Server({
noServer: true, noServer: true,
}) })
wss.on('connection', (ws, req) => { wss2 = new WebSocket.Server({
noServer: true,
})
wss1.on('connection', (ws, req) => {
const id = req.id const id = req.id
handler(ws, id) handler(ws, id)
}) })
wss2.on('connection', (ws, req) => {
const id = req.id
subscribe(ws, id)
})
const server = app.listen(port, () => { const server = app.listen(port, () => {
console.log(`Express listening: http://localhost:${port}`) console.log(`Express listening: http://localhost:${port}`)
}) })
@ -32,9 +43,14 @@ server.on('upgrade', (req, socket, head) => {
const pathname = url.parse(req.url).pathname.split('/') const pathname = url.parse(req.url).pathname.split('/')
const token = pathname[2] const token = pathname[2]
if (pathname[1] === 'ws' && get(token)) { if (pathname[1] === 'ws' && get(token)) {
wss.handleUpgrade(req, socket, head, (socket) => { wss1.handleUpgrade(req, socket, head, (socket) => {
req.id = token
wss1.emit('connection', socket, req);
})
} else if (pathname[1] === 'stream') {
wss2.handleUpgrade(req, socket, head, (socket) => {
req.id = token req.id = token
wss.emit('connection', socket, req); wss2.emit('connection', socket, req);
}) })
} }
}) })

@ -15,8 +15,7 @@ const casters = new Schema({
}, },
image: { image: {
type: String, type: String,
required: true, }
},
}); });
const Caster = model('Casters', casters) const Caster = model('Casters', casters)

@ -5,9 +5,9 @@ const events = new mongoose.Schema({
type: String, type: String,
required: true, required: true,
unique: true, unique: true,
}, }
}); });
const Event = mongoose.model('Events', events) const Event = mongoose.model('Event', events)
module.exports = Event module.exports = Event

@ -15,8 +15,7 @@ const hosts = new Schema({
}, },
image: { image: {
type: String, type: String,
required: true, }
},
}); });
const Host = model('Hosts', hosts) const Host = model('Hosts', hosts)

@ -3,23 +3,23 @@ const { Schema, model } = require('mongoose')
const gameScores = new Schema({ const gameScores = new Schema({
orange: { orange: {
type: Number, type: Number,
required: true, default: 0,
}, },
blue: { blue: {
type: Number, type: Number,
required: true, default : 0,
}, },
}) })
const matches = new Schema({ const matches = new Schema({
orange: { orange: {
type: Schema.Types.ObjectId, type: Schema.Types.ObjectId,
refs: 'Rosters', ref: 'Rosters',
required: true, required: true,
}, },
blue: { blue: {
type: Schema.Types.ObjectId, type: Schema.Types.ObjectId,
refs: 'Rosters', ref: 'Rosters',
required: true, required: true,
}, },
started: { started: {
@ -41,17 +41,16 @@ const matches = new Schema({
series: { series: {
orange: { orange: {
type: Number, type: Number,
required: true, default: 0,
}, },
blue: { blue: {
type: Number, type: Number,
required: true, default: 0,
}, },
}, },
games: { games: {
type: [gameScores], type: [gameScores],
required: true, }
},
}) })
const Match = model('Matches', matches) const Match = model('Matches', matches)

@ -3,23 +3,23 @@ const { Schema, model } = require('mongoose')
const stats = new Schema({ const stats = new Schema({
goals: { goals: {
type: Number, type: Number,
required: true, default: 0,
}, },
assists: { assists: {
type: Number, type: Number,
required: true, default: 0,
}, },
saves: { saves: {
type: Number, type: Number,
required: true, default: 0,
}, },
shots: { shots: {
type: Number, type: Number,
required: true, default: 0,
}, },
demos: { demos: {
type: Number, type: Number,
required: true, default: 0,
}, },
}) })
@ -32,18 +32,9 @@ const players = new Schema({
type: String, type: String,
required: true, required: true,
}, },
stats: {
type: [stats],
required: true,
},
}) })
const rosters = new Schema({ const rosters = new Schema({
event: {
type: Schema.Types.ObjectId,
ref: 'Events',
required: true,
},
name: { name: {
type: String, type: String,
required: true, required: true,
@ -54,8 +45,10 @@ const rosters = new Schema({
}, },
players: { players: {
type: [players], type: [players],
required: true,
}, },
stats: {
type: stats,
}
}) })
const Roster = model('Rosters', rosters) const Roster = model('Rosters', rosters)

@ -7,14 +7,20 @@ const streams = new Schema({
}, },
event: { event: {
type: Schema.Types.ObjectId, type: Schema.Types.ObjectId,
refs: 'Events', ref: 'Event',
requried: true,
},
matches: {
type: [Schema.Types.ObjectId],
refs: 'matches',
requried: true,
}, },
matches: [{
type: Schema.Types.ObjectId,
ref: 'Matches',
}],
casters: [{
type: Schema.Types.ObjectId,
ref: 'Casters',
}],
hosts: [{
type: Schema.Types.ObjectId,
ref: 'Hosts',
}],
}) })
const Stream = model('Streams', streams) const Stream = model('Streams', streams)

@ -14,7 +14,9 @@ const events = [
'delete', 'delete',
]; ];
const channelEvents = []; const channelEvents = [
'streams:full',
];
channels.forEach((c) => { channels.forEach((c) => {
events.forEach((e) => { events.forEach((e) => {

@ -7,10 +7,10 @@ const hosts = require('../models/hosts')
const eventFns = { const eventFns = {
getAll: async () => { getAll: async () => {
return await events.find().exec() return await events.find().populate('streams').exec()
}, },
getById: async (id) => { getById: async (id) => {
return await events.findById(id).exec() return await events.findById(id).populate('streams').exec()
}, },
update: async(id, data) => { update: async(id, data) => {
return await events.findByIdAndUpdate(id, data).exec() return await events.findByIdAndUpdate(id, data).exec()
@ -23,10 +23,10 @@ const eventFns = {
const ev = new events(data) const ev = new events(data)
ev.save((err) => { ev.save((err) => {
if (err) if (err)
throw err; console.warn(err)
}) })
} catch (e) { } catch (e) {
throw e; console.warn(e)
} }
} }
} }
@ -49,20 +49,20 @@ const rosterFns = {
const ev = new rosters(data) const ev = new rosters(data)
ev.save((err) => { ev.save((err) => {
if (err) if (err)
throw err; console.warn(err)
}) })
} catch (e) { } catch (e) {
throw e; console.warn(e)
} }
} }
} }
const matchFns = { const matchFns = {
getAll: async () => { getAll: async () => {
return await matches.find().exec() return await matches.find().populate('orange').populate('blue').exec()
}, },
getById: async (id) => { getById: async (id) => {
return await matches.findById(id).exec() return await matches.findById(id).populate('orange').populate('blue').exec()
}, },
update: async(id, data) => { update: async(id, data) => {
return await matches.findByIdAndUpdate(id, data).exec() return await matches.findByIdAndUpdate(id, data).exec()
@ -75,10 +75,10 @@ const matchFns = {
const ev = new matches(data) const ev = new matches(data)
ev.save((err) => { ev.save((err) => {
if (err) if (err)
throw err; console.warn(err)
}) })
} catch (e) { } catch (e) {
throw e; console.warn(e)
} }
} }
} }
@ -87,8 +87,14 @@ const streamFns = {
getAll: async () => { getAll: async () => {
return await streams.find().exec() return await streams.find().exec()
}, },
getAllPop: async () => {
return await streams.find().populate('event').populate({
path: 'matches',
populate: [{ path: 'orange' }, { path: 'blue' }],
}).populate('casters').populate('hosts').exec()
},
getById: async (id) => { getById: async (id) => {
return await streams.findById(id).exec() return await streams.findById(id).populate('event').populate('matches').populate('casters').populate('hosts').exec()
}, },
update: async(id, data) => { update: async(id, data) => {
return await streams.findByIdAndUpdate(id, data).exec() return await streams.findByIdAndUpdate(id, data).exec()
@ -101,10 +107,10 @@ const streamFns = {
const ev = new streams(data) const ev = new streams(data)
ev.save((err) => { ev.save((err) => {
if (err) if (err)
throw err; console.warn(err)
}) })
} catch (e) { } catch (e) {
throw e; console.warn(e)
} }
} }
} }
@ -127,10 +133,10 @@ const casterFns = {
const ev = new casters(data) const ev = new casters(data)
ev.save((err) => { ev.save((err) => {
if (err) if (err)
throw err; console.warn(err)
}) })
} catch (e) { } catch (e) {
throw e; console.warn(e)
} }
} }
} }
@ -153,10 +159,10 @@ const hostFns = {
const ev = new hosts(data) const ev = new hosts(data)
ev.save((err) => { ev.save((err) => {
if (err) if (err)
throw err; console.warn(err)
}) })
} catch (e) { } catch (e) {
throw e; console.warn(e)
} }
} }
} }

@ -1,6 +1,7 @@
const { pop } = require('../uuids') const { pop } = require('../uuids')
const { channelEvents } = require('./channels') const { channelEvents } = require('./channels')
const crud = require('./crud') const crud = require('./crud')
const { recvUpdate } = require('./streams')
const connections = {}; const connections = {};
@ -12,7 +13,7 @@ const handler = (ws, id) => {
ws.send(JSON.stringify({ ws.send(JSON.stringify({
event: 'info', event: 'info',
data: 'Welcome to APL Nuke v1.0.0!' data: 'Welcome to APL Nuke v1.1.0!'
})) }))
ws.on('message', (msg) => handleMsg(msg, id)) ws.on('message', (msg) => handleMsg(msg, id))
@ -26,25 +27,38 @@ const handleMsg = async (msg, id) => {
try { try {
const d = JSON.parse(msg) const d = JSON.parse(msg)
if (d.hasOwnProperty('subscribe') && channelEvents.indexOf(d.subscribe) !== -1) { if (d.hasOwnProperty('subscribe') && channelEvents.indexOf(d.subscribe) !== -1) {
console.log('received sub for ', d.subscribe)
const channel = d.subscribe.split(':')[0] const channel = d.subscribe.split(':')[0]
connections[id].events.push(d.subscribe) connections[id].events.push(d.subscribe)
connections[id].connection.send(JSON.stringify(await crud[channel].getAll())) const dm = {
event: `${channel}:read`,
data: await crud[channel].getAll(),
}
if (channel === 'streams' && d.subscribe === 'streams:full') {
dm.event = 'streams:full'
dm.data = await crud[channel].getAllPop()
}
connections[id].connection.send(JSON.stringify(dm))
} else if (d.hasOwnProperty('event') && channelEvents.indexOf(d.event) !== -1) { } else if (d.hasOwnProperty('event') && channelEvents.indexOf(d.event) !== -1) {
const ev = d.event.split(':') const ev = d.event.split(':')
const channel = ev[0] const channel = ev[0]
const event = ev[1] const event = ev[1]
console.log(`received event for ${channel} with data %s`, d.data) console.log(`received event for ${channel} with data %s`, d.data)
let dm;
switch (event) { switch (event) {
case 'create': case 'create':
await crud[channel].create(d.data) dm = await crud[channel].create(d.data)
recvUpdate(channel, dm)
fanoutMsg(channel, await crud[channel].getAll()) fanoutMsg(channel, await crud[channel].getAll())
break; break;
case 'update': case 'update':
await crud[channel].update(d.data.id, d.data.data) dm = await crud[channel].update(d.data.id, d.data.data)
recvUpdate(channel, dm)
fanoutMsg(channel, await crud[channel].getAll()) fanoutMsg(channel, await crud[channel].getAll())
break; break;
case 'delete': case 'delete':
await crud[channel].delete(d.data.id) await crud[channel].delete(d.data.id)
recvUpdate(channel, 'delete')
fanoutMsg(channel, await crud[channel].getAll()) fanoutMsg(channel, await crud[channel].getAll())
break; break;
} }
@ -55,9 +69,21 @@ const handleMsg = async (msg, id) => {
} }
const fanoutMsg = (channel, data) => { const fanoutMsg = (channel, data) => {
Object.keys(connections).forEach((k) => { Object.keys(connections).forEach( async (k) => {
if (connections[k].events.indexOf(`${channel}:read`) !== -1) { if (connections[k].events.indexOf(`${channel}:read`) !== -1) {
connections[k].connection.send(JSON.stringify(data)) const d = {
event: `${channel}:read`,
data,
}
connections[k].connection.send(JSON.stringify(d))
if (connections[k].events.indexOf('streams:full') !== -1) {
const m = await crud['streams'].getAllPop()
const d = {
event: 'streams:full',
data: m,
}
connections[k].connection.send(JSON.stringify(d))
}
} }
}) })
} }

@ -0,0 +1,50 @@
const { v4 } = require('uuid')
const crud = require('./crud')
const connections = {}
const subscribe = (ws, sid) => {
const id = v4()
connections[id] = {
connection: ws,
streamid: sid,
}
ws.send(JSON.stringify({
event: 'info',
data: 'Welcome to APL Nuke v1.1.0!',
}))
sendInitial(id)
ws.on('close', () => {
delete connections[id]
})
}
const cModel = {
'events': 'event',
'matches': 'matches',
'casters': 'casters',
'hosts': 'hosts',
}
const recvUpdate = async (channel, data) => {
Object.keys(connections).forEach((k) => {
sendInitial(k)
})
}
const sendInitial = async (id) => {
const c = connections[id]
const stream = await crud['streams'].getById(c.streamid)
c.connection.send(JSON.stringify({
event: 'streams:read',
data: stream,
}))
}
module.exports = {
subscribe,
recvUpdate,
}
Loading…
Cancel
Save