implemented per-stream data views

main
Ayush Mukherjee 4 years ago
parent 2ca601bc02
commit 99467f229c

@ -4,6 +4,7 @@ const url = require('url')
const app = require('./app')
const { get } = require('./uuids')
const handler = require('./ws/handler')
const { subscribe } = require('./ws/streams')
mongoose.connect(process.env.MONGO_URI, {
useNewUrlParser: true,
@ -15,15 +16,24 @@ db.once('open', () => console.log('Connected to mongodb instance!'))
const port = process.env.PORT || 5000;
wss = new WebSocket.Server({
wss1 = new WebSocket.Server({
noServer: true,
})
wss2 = new WebSocket.Server({
noServer: true,
})
wss.on('connection', (ws, req) => {
wss1.on('connection', (ws, req) => {
const id = req.id
handler(ws, id)
})
wss2.on('connection', (ws, req) => {
const id = req.id
subscribe(ws, id)
})
const server = app.listen(port, () => {
console.log(`Express listening: http://localhost:${port}`)
})
@ -32,9 +42,14 @@ server.on('upgrade', (req, socket, head) => {
const pathname = url.parse(req.url).pathname.split('/')
const token = pathname[2]
if (pathname[1] === 'ws' && get(token)) {
wss.handleUpgrade(req, socket, head, (socket) => {
wss1.handleUpgrade(req, socket, head, (socket) => {
req.id = token
wss1.emit('connection', socket, req);
})
} else if (pathname[1] === 'stream') {
wss2.handleUpgrade(req, socket, head, (socket) => {
req.id = token
wss.emit('connection', socket, req);
wss2.emit('connection', socket, req);
})
}
})

@ -16,7 +16,7 @@ const casters = new Schema({
image: {
type: String,
required: true,
},
}
});
const Caster = model('Casters', casters)

@ -5,9 +5,9 @@ const events = new mongoose.Schema({
type: String,
required: true,
unique: true,
},
}
});
const Event = mongoose.model('Events', events)
const Event = mongoose.model('Event', events)
module.exports = Event

@ -16,7 +16,7 @@ const hosts = new Schema({
image: {
type: String,
required: true,
},
}
});
const Host = model('Hosts', hosts)

@ -51,7 +51,7 @@ const matches = new Schema({
games: {
type: [gameScores],
required: true,
},
}
})
const Match = model('Matches', matches)

@ -39,11 +39,6 @@ const players = new Schema({
})
const rosters = new Schema({
event: {
type: Schema.Types.ObjectId,
ref: 'Events',
required: true,
},
name: {
type: String,
required: true,

@ -7,13 +7,19 @@ const streams = new Schema({
},
event: {
type: Schema.Types.ObjectId,
refs: 'Events',
requried: true,
ref: 'Event',
},
matches: {
type: [Schema.Types.ObjectId],
refs: 'matches',
requried: true,
ref: 'Matches',
},
casters: {
type: [Schema.Types.ObjectId],
ref: 'Casters',
},
hosts: {
type: [Schema.Types.ObjectId],
refs: 'Hosts',
},
})

@ -7,10 +7,10 @@ const hosts = require('../models/hosts')
const eventFns = {
getAll: async () => {
return await events.find().exec()
return await events.find().populate('streams').exec()
},
getById: async (id) => {
return await events.findById(id).exec()
return await events.findById(id).populate('streams').exec()
},
update: async(id, data) => {
return await events.findByIdAndUpdate(id, data).exec()
@ -33,10 +33,10 @@ const eventFns = {
const rosterFns = {
getAll: async () => {
return await rosters.find().exec()
return await rosters.find().populate('event').populate('match').exec()
},
getById: async (id) => {
return await rosters.findById(id).exec()
return await rosters.findById(id).populate('event').populate('match').exec()
},
update: async(id, data) => {
return await rosters.findByIdAndUpdate(id, data).exec()
@ -59,10 +59,10 @@ const rosterFns = {
const matchFns = {
getAll: async () => {
return await matches.find().exec()
return await matches.find().populate('orange').populate('blue').populate('stream').exec()
},
getById: async (id) => {
return await matches.findById(id).exec()
return await matches.findById(id).populate('orange').populate('blue').populate('stream').exec()
},
update: async(id, data) => {
return await matches.findByIdAndUpdate(id, data).exec()
@ -85,10 +85,10 @@ const matchFns = {
const streamFns = {
getAll: async () => {
return await streams.find().exec()
return await streams.find().populate('event').populate('matches').populate('casters').populate('hosts').exec()
},
getById: async (id) => {
return await streams.findById(id).exec()
return await streams.findById(id).populate('event').populate('matches').populate('casters').populate('hosts').exec()
},
update: async(id, data) => {
return await streams.findByIdAndUpdate(id, data).exec()
@ -111,10 +111,10 @@ const streamFns = {
const casterFns = {
getAll: async () => {
return await casters.find().exec()
return await casters.find().populate('streams').exec()
},
getById: async (id) => {
return await casters.findById(id).exec()
return await casters.findById(id).populate('streams').exec()
},
update: async(id, data) => {
return await casters.findByIdAndUpdate(id, data).exec()
@ -137,10 +137,10 @@ const casterFns = {
const hostFns = {
getAll: async () => {
return await hosts.find().exec()
return await hosts.find().populate('streams').exec()
},
getById: async (id) => {
return await hosts.findById(id).exec()
return await hosts.findById(id).populate('streams').exec()
},
update: async(id, data) => {
return await hosts.findByIdAndUpdate(id, data).exec()

@ -12,7 +12,7 @@ const handler = (ws, id) => {
ws.send(JSON.stringify({
event: 'info',
data: 'Welcome to APL Nuke v1.0.0!'
data: 'Welcome to APL Nuke v1.1.0!'
}))
ws.on('message', (msg) => handleMsg(msg, id))
@ -36,11 +36,14 @@ const handleMsg = async (msg, id) => {
console.log(`received event for ${channel} with data %s`, d.data)
switch (event) {
case 'create':
await crud[channel].create(d.data)
const da = await crud[channel].create(d.data)
connections[id].connection.send(JSON.stringify(da))
fanoutMsg(channel, await crud[channel].getAll())
fanoutMsg(channel, await crud[channel].getAll())
break;
case 'update':
await crud[channel].update(d.data.id, d.data.data)
const du = await crud[channel].update(d.data.id, d.data.data)
connections[id].connection.send(JSON.stringify(du))
fanoutMsg(channel, await crud[channel].getAll())
break;
case 'delete':

@ -0,0 +1,53 @@
const { v4 } = require('uuid')
const crud = require('./crud')
const connections = {}
const subscribe = (ws, sid) => {
const id = v4()
connections[id] = {
connection: ws,
streamid: sid,
}
ws.send(JSON.stringify({
event: 'info',
data: 'Welcome to APL Nuke v1.1.0!',
}))
sendInitial(id)
ws.on('close', () => {
delete connections[id]
})
}
const cModel = {
'events': 'event',
'matches': 'matches',
'casters': 'casters',
'hosts': 'hosts',
}
const recvUpdate = async (channel, data) => {
const streams = await crud['streams'].getAll()
const stream = streams.filter(x => x[cModel[channel]]._id === data._id)[0]
const c = connections.filter(x => x.streamid === stream._id)
Object.keys(c).forEach((k) => {
c[k].connection.send(JSON.stringify(stream))
})
}
const sendInitial = async (id) => {
const c = connections[id]
const stream = await crud['streams'].getById(c.streamid)
c.connection.send(JSON.stringify({
event: 'streams:read',
data: stream,
}))
}
module.exports = {
subscribe,
recvUpdate,
}
Loading…
Cancel
Save