OpenShot Audio Library | OpenShotAudio 0.4.0
Loading...
Searching...
No Matches
juce_InterprocessConnection.cpp
1/*
2 ==============================================================================
3
4 This file is part of the JUCE library.
5 Copyright (c) 2022 - Raw Material Software Limited
6
7 JUCE is an open source library subject to commercial or open-source
8 licensing.
9
10 The code included in this file is provided under the terms of the ISC license
11 http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12 To use, copy, modify, and/or distribute this software for any purpose with or
13 without fee is hereby granted provided that the above copyright notice and
14 this permission notice appear in all copies.
15
16 JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17 EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18 DISCLAIMED.
19
20 ==============================================================================
21*/
22
23namespace juce
24{
25
26struct InterprocessConnection::ConnectionThread final : public Thread
27{
28 ConnectionThread (InterprocessConnection& c) : Thread ("JUCE IPC"), owner (c) {}
29 void run() override { owner.runThread(); }
30
32 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionThread)
33};
34
35class SafeActionImpl
36{
37public:
38 explicit SafeActionImpl (InterprocessConnection& p)
39 : ref (p) {}
40
41 template <typename Fn>
42 void ifSafe (Fn&& fn)
43 {
44 const ScopedLock lock (mutex);
45
46 if (safe)
47 fn (ref);
48 }
49
50 void setSafe (bool s)
51 {
52 const ScopedLock lock (mutex);
53 safe = s;
54 }
55
56 bool isSafe()
57 {
58 const ScopedLock lock (mutex);
59 return safe;
60 }
61
62private:
63 CriticalSection mutex;
64 InterprocessConnection& ref;
65 bool safe = false;
66};
67
68class InterprocessConnection::SafeAction final : public SafeActionImpl
69{
70 using SafeActionImpl::SafeActionImpl;
71};
72
73//==============================================================================
74InterprocessConnection::InterprocessConnection (bool callbacksOnMessageThread, uint32 magicMessageHeaderNumber)
75 : useMessageThread (callbacksOnMessageThread),
76 magicMessageHeader (magicMessageHeaderNumber),
77 safeAction (std::make_shared<SafeAction> (*this))
78{
79 thread.reset (new ConnectionThread (*this));
80}
81
83{
84 // You *must* call `disconnect` in the destructor of your derived class to ensure
85 // that any pending messages are not delivered. If the messages were delivered after
86 // destroying the derived class, we'd end up calling the pure virtual implementations
87 // of `messageReceived`, `connectionMade` and `connectionLost` which is definitely
88 // not a good idea!
89 jassert (! safeAction->isSafe());
90
91 callbackConnectionState = false;
92 disconnect (4000, Notify::no);
93 thread.reset();
94}
95
96//==============================================================================
98 int portNumber, int timeOutMillisecs)
99{
100 disconnect();
101
102 auto s = std::make_unique<StreamingSocket>();
103
104 if (s->connect (hostName, portNumber, timeOutMillisecs))
105 {
106 const ScopedWriteLock sl (pipeAndSocketLock);
107 initialiseWithSocket (std::move (s));
108 return true;
109 }
110
111 return false;
112}
113
114bool InterprocessConnection::connectToPipe (const String& pipeName, int timeoutMs)
115{
116 disconnect();
117
118 auto newPipe = std::make_unique<NamedPipe>();
119
120 if (newPipe->openExisting (pipeName))
121 {
122 const ScopedWriteLock sl (pipeAndSocketLock);
123 pipeReceiveMessageTimeout = timeoutMs;
124 initialiseWithPipe (std::move (newPipe));
125 return true;
126 }
127
128 return false;
129}
130
131bool InterprocessConnection::createPipe (const String& pipeName, int timeoutMs, bool mustNotExist)
132{
133 disconnect();
134
135 auto newPipe = std::make_unique<NamedPipe>();
136
137 if (newPipe->createNewPipe (pipeName, mustNotExist))
138 {
139 const ScopedWriteLock sl (pipeAndSocketLock);
140 pipeReceiveMessageTimeout = timeoutMs;
141 initialiseWithPipe (std::move (newPipe));
142 return true;
143 }
144
145 return false;
146}
147
148void InterprocessConnection::disconnect (int timeoutMs, Notify notify)
149{
150 thread->signalThreadShouldExit();
151
152 {
153 const ScopedReadLock sl (pipeAndSocketLock);
154 if (socket != nullptr) socket->close();
155 if (pipe != nullptr) pipe->close();
156 }
157
158 thread->stopThread (timeoutMs);
159 deletePipeAndSocket();
160
161 if (notify == Notify::yes)
162 connectionLostInt();
163
164 callbackConnectionState = false;
165 safeAction->setSafe (false);
166}
167
168void InterprocessConnection::deletePipeAndSocket()
169{
170 const ScopedWriteLock sl (pipeAndSocketLock);
171 socket.reset();
172 pipe.reset();
173}
174
176{
177 const ScopedReadLock sl (pipeAndSocketLock);
178
179 return ((socket != nullptr && socket->isConnected())
180 || (pipe != nullptr && pipe->isOpen()))
181 && threadIsRunning;
182}
183
185{
186 {
187 const ScopedReadLock sl (pipeAndSocketLock);
188
189 if (pipe == nullptr && socket == nullptr)
190 return {};
191
192 if (socket != nullptr && ! socket->isLocal())
193 return socket->getHostName();
194 }
195
196 return IPAddress::local().toString();
197}
198
199//==============================================================================
201{
202 uint32 messageHeader[2] = { ByteOrder::swapIfBigEndian (magicMessageHeader),
203 ByteOrder::swapIfBigEndian ((uint32) message.getSize()) };
204
205 MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
206 messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
207 messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
208
209 return writeData (messageData.getData(), (int) messageData.getSize()) == (int) messageData.getSize();
210}
211
212int InterprocessConnection::writeData (void* data, int dataSize)
213{
214 const ScopedReadLock sl (pipeAndSocketLock);
215
216 if (socket != nullptr)
217 return socket->write (data, dataSize);
218
219 if (pipe != nullptr)
220 return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
221
222 return 0;
223}
224
225//==============================================================================
226void InterprocessConnection::initialise()
227{
228 safeAction->setSafe (true);
229 threadIsRunning = true;
230 connectionMadeInt();
231 thread->startThread();
232}
233
234void InterprocessConnection::initialiseWithSocket (std::unique_ptr<StreamingSocket> newSocket)
235{
236 jassert (socket == nullptr && pipe == nullptr);
237 socket = std::move (newSocket);
238 initialise();
239}
240
241void InterprocessConnection::initialiseWithPipe (std::unique_ptr<NamedPipe> newPipe)
242{
243 jassert (socket == nullptr && pipe == nullptr);
244 pipe = std::move (newPipe);
245 initialise();
246}
247
248//==============================================================================
249struct ConnectionStateMessage final : public MessageManager::MessageBase
250{
251 ConnectionStateMessage (std::shared_ptr<SafeActionImpl> ipc, bool connected) noexcept
252 : safeAction (ipc), connectionMade (connected)
253 {}
254
255 void messageCallback() override
256 {
257 safeAction->ifSafe ([this] (InterprocessConnection& owner)
258 {
259 if (connectionMade)
260 owner.connectionMade();
261 else
262 owner.connectionLost();
263 });
264 }
265
266 std::shared_ptr<SafeActionImpl> safeAction;
267 bool connectionMade;
268
269 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage)
270};
271
272void InterprocessConnection::connectionMadeInt()
273{
274 if (! callbackConnectionState)
275 {
276 callbackConnectionState = true;
277
278 if (useMessageThread)
279 (new ConnectionStateMessage (safeAction, true))->post();
280 else
282 }
283}
284
285void InterprocessConnection::connectionLostInt()
286{
287 if (callbackConnectionState)
288 {
289 callbackConnectionState = false;
290
291 if (useMessageThread)
292 (new ConnectionStateMessage (safeAction, false))->post();
293 else
295 }
296}
297
298struct DataDeliveryMessage final : public Message
299{
300 DataDeliveryMessage (std::shared_ptr<SafeActionImpl> ipc, const MemoryBlock& d)
301 : safeAction (ipc), data (d)
302 {}
303
304 void messageCallback() override
305 {
306 safeAction->ifSafe ([this] (InterprocessConnection& owner)
307 {
308 owner.messageReceived (data);
309 });
310 }
311
312 std::shared_ptr<SafeActionImpl> safeAction;
313 MemoryBlock data;
314};
315
316void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
317{
318 jassert (callbackConnectionState);
319
320 if (useMessageThread)
321 (new DataDeliveryMessage (safeAction, data))->post();
322 else
323 messageReceived (data);
324}
325
326//==============================================================================
327int InterprocessConnection::readData (void* data, int num)
328{
329 const ScopedReadLock sl (pipeAndSocketLock);
330
331 if (socket != nullptr)
332 return socket->read (data, num, true);
333
334 if (pipe != nullptr)
335 return pipe->read (data, num, pipeReceiveMessageTimeout);
336
337 jassertfalse;
338 return -1;
339}
340
341bool InterprocessConnection::readNextMessage()
342{
343 uint32 messageHeader[2];
344 auto bytes = readData (messageHeader, sizeof (messageHeader));
345
346 if (bytes == (int) sizeof (messageHeader)
347 && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
348 {
349 auto bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
350
351 if (bytesInMessage > 0)
352 {
353 MemoryBlock messageData ((size_t) bytesInMessage, true);
354 int bytesRead = 0;
355
356 while (bytesInMessage > 0)
357 {
358 if (thread->threadShouldExit())
359 return false;
360
361 auto numThisTime = jmin (bytesInMessage, 65536);
362 auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
363
364 if (bytesIn <= 0)
365 break;
366
367 bytesRead += bytesIn;
368 bytesInMessage -= bytesIn;
369 }
370
371 if (bytesRead >= 0)
372 deliverDataInt (messageData);
373 }
374
375 return true;
376 }
377
378 if (bytes < 0)
379 {
380 if (socket != nullptr)
381 deletePipeAndSocket();
382
383 connectionLostInt();
384 }
385
386 return false;
387}
388
389void InterprocessConnection::runThread()
390{
391 while (! thread->threadShouldExit())
392 {
393 if (socket != nullptr)
394 {
395 auto ready = socket->waitUntilReady (true, 100);
396
397 if (ready < 0)
398 {
399 deletePipeAndSocket();
400 connectionLostInt();
401 break;
402 }
403
404 if (ready == 0)
405 {
406 thread->wait (1);
407 continue;
408 }
409 }
410 else if (pipe != nullptr)
411 {
412 if (! pipe->isOpen())
413 {
414 deletePipeAndSocket();
415 connectionLostInt();
416 break;
417 }
418 }
419 else
420 {
421 break;
422 }
423
424 if (thread->threadShouldExit() || ! readNextMessage())
425 break;
426 }
427
428 threadIsRunning = false;
429}
430
431} // namespace juce
static Type swapIfBigEndian(Type value) noexcept
static IPAddress local(bool IPv6=false) noexcept
String toString() const
virtual void connectionMade()=0
virtual void messageReceived(const MemoryBlock &message)=0
void disconnect(int timeoutMs=-1, Notify notify=Notify::yes)
InterprocessConnection(bool callbacksOnMessageThread=true, uint32 magicMessageHeaderNumber=0xf2b49e2c)
bool createPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist=false)
bool connectToPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs)
virtual void connectionLost()=0
bool sendMessage(const MemoryBlock &message)
bool connectToSocket(const String &hostName, int portNumber, int timeOutMillisecs)
void copyFrom(const void *srcData, int destinationOffset, size_t numBytes) noexcept
void * getData() noexcept
size_t getSize() const noexcept
Thread(const String &threadName, size_t threadStackSize=osDefaultStackSize)
virtual void run()=0