Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
CAP
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
tsai
CAP
Commits
70a4c54e
Commit
70a4c54e
authored
Dec 29, 2016
by
yangxiaodong
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add event bus
parent
189b546a
Changes
13
Hide whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
741 additions
and
0 deletions
+741
-0
BackgroundWorker.cs
src/Cap.Consistency/EventBus/BackgroundWorker.cs
+107
-0
BrokeredMessage.cs
src/Cap.Consistency/EventBus/BrokeredMessage.cs
+9
-0
EventBusBase.cs
src/Cap.Consistency/EventBus/EventBusBase.cs
+155
-0
EventBusFactory.cs
src/Cap.Consistency/EventBus/EventBusFactory.cs
+23
-0
EventBusOptions.cs
src/Cap.Consistency/EventBus/EventBusOptions.cs
+20
-0
EventHandlerHolder.cs
src/Cap.Consistency/EventBus/EventHandlerHolder.cs
+49
-0
EventSubscriberAttribute.cs
src/Cap.Consistency/EventBus/EventSubscriberAttribute.cs
+13
-0
IBackgroundWorker.cs
src/Cap.Consistency/EventBus/IBackgroundWorker.cs
+18
-0
IEventBus.cs
src/Cap.Consistency/EventBus/IEventBus.cs
+35
-0
IEventBusFactory.cs
src/Cap.Consistency/EventBus/IEventBusFactory.cs
+9
-0
OrderedEventBus.cs
src/Cap.Consistency/EventBus/Ordered/OrderedEventBus.cs
+119
-0
ReceiveResult.cs
src/Cap.Consistency/EventBus/ReceiveResult.cs
+25
-0
SimpleEventBus.cs
src/Cap.Consistency/EventBus/Simple/SimpleEventBus.cs
+159
-0
No files found.
src/Cap.Consistency/EventBus/BackgroundWorker.cs
0 → 100644
View file @
70a4c54e
using
System
;
using
System.Threading
;
using
System.Threading.Tasks
;
using
Microsoft.Extensions.Logging
;
namespace
Cap.Consistency.EventBus
{
public
abstract
class
BackgroundWorker
:
IBackgroundWorker
,
IDisposable
{
protected
readonly
ILogger
_logger
;
#if FEATURE_THREAD
protected
Thread
_dispatchThread
;
#else
protected
Task
_dispatchThread
;
#endif
protected
CancellationTokenSource
_cancellationTokenSource
;
public
virtual
bool
IsRunning
{
get
{
return
this
.
_dispatchThread
!=
null
&&
#if FEATURE_THREAD
this
.
_dispatchThread
.
ThreadState
==
ThreadState
.
Running
;
#else
this
.
_dispatchThread
.
Status
==
TaskStatus
.
Running
;
#endif
}
}
protected
BackgroundWorker
(
ILoggerFactory
loggerFactory
)
{
this
.
_logger
=
loggerFactory
.
CreateLogger
(
this
.
GetType
().
FullName
);
}
public
virtual
void
Start
()
{
this
.
Start
(
false
);
}
public
virtual
void
Start
(
bool
force
)
{
if
(!
force
)
{
if
(
this
.
IsRunning
)
{
return
;
}
}
this
.
_cancellationTokenSource
=
new
CancellationTokenSource
();
#if !FEATURE_THREAD
this
.
_dispatchThread
=
this
.
ThreadWorker
(
this
.
_cancellationTokenSource
.
Token
);
#else
this
.
_dispatchThread
=
new
Thread
((
userObject
)
=>
{
this
.
ThreadWorker
(
userObject
).
GetAwaiter
().
GetResult
();
})
{
IsBackground
=
true
,
Name
=
$"
{
this
.
GetType
().
Name
}
-Thread-
{
Guid
.
NewGuid
().
ToString
()}
"
};
this
.
_dispatchThread
.
Start
(
this
.
_cancellationTokenSource
.
Token
);
#endif
}
public
virtual
void
Stop
(
int
timeout
=
2000
)
{
Task
.
WaitAny
(
Task
.
Run
(()
=>
{
this
.
_cancellationTokenSource
.
Cancel
();
while
(
this
.
IsRunning
)
{
Task
.
Delay
(
500
).
GetAwaiter
().
GetResult
();
}
}),
Task
.
Delay
(
timeout
));
}
protected
virtual
async
Task
ThreadWorker
(
object
userObject
)
{
this
.
_logger
.
LogInformation
(
$"Background worker
{
this
.
GetType
().
FullName
}
has been started."
);
var
token
=
(
CancellationToken
)
userObject
;
while
(!
token
.
IsCancellationRequested
&&
await
this
.
Process
())
{
}
this
.
_logger
.
LogInformation
(
$"Background worker
{
this
.
GetType
().
FullName
}
has been stopped."
);
}
protected
abstract
Task
<
bool
>
Process
();
#
region
IDisposable
// Flag: Has Dispose already been called?
private
bool
disposed
=
false
;
// Public implementation of Dispose pattern callable by consumers.
public
void
Dispose
()
{
Dispose
(
true
);
GC
.
SuppressFinalize
(
this
);
}
// Protected implementation of Dispose pattern.
protected
virtual
void
Dispose
(
bool
disposing
)
{
if
(
disposed
)
return
;
if
(
disposing
)
{
// Free any other managed objects here.
this
.
Stop
();
}
// Free any unmanaged objects here.
disposed
=
true
;
}
#
endregion
IDisposable
}
}
\ No newline at end of file
src/Cap.Consistency/EventBus/BrokeredMessage.cs
0 → 100644
View file @
70a4c54e
namespace
Cap.Consistency.EventBus
{
public
class
BrokeredMessage
{
public
byte
[]
Body
{
get
;
set
;
}
public
string
Type
{
get
;
set
;
}
}
}
\ No newline at end of file
src/Cap.Consistency/EventBus/EventBusBase.cs
0 → 100644
View file @
70a4c54e
using
System
;
using
System.Collections.Generic
;
using
System.Linq
;
using
System.Reflection
;
using
Microsoft.Extensions.Logging
;
namespace
Cap.Consistency.EventBus
{
/// <summary>
/// The EventBusBase class is the base class for all the IEventBus implementations.
/// </summary>
public
abstract
class
EventBusBase
:
BackgroundWorker
,
IEventBus
,
IDisposable
{
public
const
int
DefaultMaxPendingEventNumber
=
1024
*
1024
;
public
event
EventHandler
<
EventHandlerHolder
>
MessageReceieved
;
protected
readonly
object
_eventHandlerLock
=
new
object
();
protected
List
<
EventHandlerHolder
>
_eventHandlerList
=
new
List
<
EventHandlerHolder
>();
/// <summary>
/// The pending event number which does not yet dispatched.
/// </summary>
public
abstract
long
PendingEventNumber
{
get
;
}
public
virtual
bool
IsDispatcherEnabled
{
get
{
return
base
.
IsRunning
;
}
}
/// <summary>
/// The constructor of EventBusBase.
/// </summary>
/// <param name="loggerFactory"></param>
protected
EventBusBase
(
ILoggerFactory
loggerFactory
)
:
base
(
loggerFactory
)
{
this
.
_eventHandlerList
=
new
List
<
EventHandlerHolder
>();
}
/// <summary>
/// Post an event to the event bus, dispatched after the specific time.
/// </summary>
/// <remarks>If you do not need the event processed in the delivery order, use SimpleEventBus instead.</remarks>
/// <param name="eventObject">The event object</param>
/// <param name="dispatchDelay">The delay time before dispatch this event</param>
public
abstract
void
Post
(
object
eventObject
,
TimeSpan
dispatchDelay
);
/// <summary>
/// Register event handlers in the handler instance.
/// One handler instance may have many event handler methods.
/// These methods have EventSubscriberAttribute contract and exactly one parameter.
/// </summary>
/// <remarks>If you do not need the event processed in the delivery order, use SimpleEventBus instead.</remarks>
/// <param name="handler">The instance of event handler class</param>
public
void
Register
(
object
handler
)
{
if
(
handler
==
null
)
{
return
;
}
var
miList
=
handler
.
GetType
().
GetRuntimeMethods
();
lock
(
_eventHandlerLock
)
{
// Don't allow register multiple times.
if
(
_eventHandlerList
.
Any
(
record
=>
record
.
Handler
==
handler
))
{
return
;
}
List
<
EventHandlerHolder
>
newList
=
null
;
foreach
(
var
mi
in
miList
)
{
var
attribute
=
mi
.
GetCustomAttribute
<
EventSubscriberAttribute
>();
if
(
attribute
!=
null
)
{
var
piList
=
mi
.
GetParameters
();
if
(
piList
.
Length
==
1
)
{
// OK, we got valid handler, create newList as needed
if
(
newList
==
null
)
{
newList
=
new
List
<
EventHandlerHolder
>(
_eventHandlerList
);
}
newList
.
Add
(
this
.
CreateEventHandlerHolder
(
handler
,
mi
,
piList
[
0
].
ParameterType
));
}
}
}
// OK, we have new handler registered
if
(
newList
!=
null
)
{
_eventHandlerList
=
newList
;
}
}
}
/// <summary>
/// Unregister event handlers belong to the handler instance.
/// One handler instance may have many event handler methods.
/// These methods have EventSubscriberAttribute contract and exactly one parameter.
/// </summary>
/// <param name="handler">The instance of event handler class</param>
public
void
Unregister
(
object
handler
)
{
if
(
handler
==
null
)
{
return
;
}
lock
(
_eventHandlerLock
)
{
bool
needAction
=
_eventHandlerList
.
Any
(
record
=>
record
.
Handler
==
handler
);
if
(
needAction
)
{
var
newList
=
new
List
<
EventHandlerHolder
>();
foreach
(
var
record
in
this
.
_eventHandlerList
)
{
if
(
record
.
Handler
!=
handler
)
{
newList
.
Add
(
record
);
}
else
{
record
.
Dispose
();
}
}
_eventHandlerList
=
newList
;
}
}
}
protected
virtual
EventHandlerHolder
CreateEventHandlerHolder
(
object
handler
,
MethodInfo
methodInfo
,
Type
parameterType
)
{
return
new
EventHandlerHolder
(
handler
,
methodInfo
,
parameterType
);
}
protected
virtual
void
OnMessageReceieved
(
EventHandlerHolder
handler
)
{
this
.
MessageReceieved
?.
Invoke
(
this
,
handler
);
}
#
region
IDisposable
// Flag: Has Dispose already been called?
private
bool
disposed
=
false
;
// Public implementation of Dispose pattern callable by consumers.
public
new
void
Dispose
()
{
Dispose
(
true
);
GC
.
SuppressFinalize
(
this
);
}
// Protected implementation of Dispose pattern.
protected
new
virtual
void
Dispose
(
bool
disposing
)
{
if
(
disposed
)
return
;
if
(
disposing
)
{
// Free any other managed objects here.
this
.
Stop
();
}
// Free any unmanaged objects here.
disposed
=
true
;
}
#
endregion
IDisposable
}
}
\ No newline at end of file
src/Cap.Consistency/EventBus/EventBusFactory.cs
0 → 100644
View file @
70a4c54e
using
System
;
using
Microsoft.Extensions.Logging
;
namespace
Cap.Consistency.EventBus
{
public
class
EventBusFactory
:
IEventBusFactory
{
private
readonly
ILoggerFactory
_loggerFactory
;
public
EventBusFactory
(
ILoggerFactory
loggerFactory
)
{
this
.
_loggerFactory
=
loggerFactory
;
}
public
IEventBus
CreateEventBus
<
TEventBus
>()
where
TEventBus
:
IEventBus
{
return
this
.
CreateEventBus
<
TEventBus
>(-
1
);
}
public
IEventBus
CreateEventBus
<
TEventBus
>(
long
maxPendingEventNumber
)
where
TEventBus
:
IEventBus
{
return
Activator
.
CreateInstance
(
typeof
(
TEventBus
),
new
object
[]
{
this
.
_loggerFactory
,
maxPendingEventNumber
})
as
IEventBus
;
}
}
}
\ No newline at end of file
src/Cap.Consistency/EventBus/EventBusOptions.cs
0 → 100644
View file @
70a4c54e
namespace
Cap.Consistency.EventBus
{
public
class
EventBusOptions
{
public
long
MaxPendingEventNumber
{
get
;
set
;
}
public
int
MaxPendingEventNumber32
{
get
{
if
(
this
.
MaxPendingEventNumber
<
int
.
MaxValue
)
{
return
(
int
)
this
.
MaxPendingEventNumber
;
}
return
int
.
MaxValue
;
}
}
public
EventBusOptions
()
{
this
.
MaxPendingEventNumber
=
EventBusBase
.
DefaultMaxPendingEventNumber
;
}
}
}
\ No newline at end of file
src/Cap.Consistency/EventBus/EventHandlerHolder.cs
0 → 100644
View file @
70a4c54e
using
System
;
using
System.Reflection
;
namespace
Cap.Consistency.EventBus
{
public
class
EventHandlerHolder
:
IDisposable
{
public
object
Handler
{
get
;
}
public
MethodInfo
MethodInfo
{
get
;
}
public
Type
ParameterType
{
get
;
}
public
EventHandlerHolder
(
object
handler
,
MethodInfo
methodInfo
,
Type
parameterType
)
{
Handler
=
handler
;
MethodInfo
=
methodInfo
;
ParameterType
=
parameterType
;
}
#
region
IDisposable
// Flag: Has Dispose already been called?
private
bool
disposed
=
false
;
// Public implementation of Dispose pattern callable by consumers.
public
void
Dispose
()
{
Dispose
(
true
);
GC
.
SuppressFinalize
(
this
);
}
// Protected implementation of Dispose pattern.
protected
virtual
void
Dispose
(
bool
disposing
)
{
if
(
disposed
)
return
;
if
(
disposing
)
{
// Free any other managed objects here.
//
}
// Free any unmanaged objects here.
//
disposed
=
true
;
}
#
endregion
IDisposable
}
}
\ No newline at end of file
src/Cap.Consistency/EventBus/EventSubscriberAttribute.cs
0 → 100644
View file @
70a4c54e
using
System
;
namespace
Cap.Consistency.EventBus
{
/// <summary>
/// The attribute class of the event handler.
/// If a method have this attribue contract and exactly one parameter, then it's event handler.
/// </summary>
public
class
EventSubscriberAttribute
:
Attribute
{
}
}
\ No newline at end of file
src/Cap.Consistency/EventBus/IBackgroundWorker.cs
0 → 100644
View file @
70a4c54e
namespace
Cap.Consistency.EventBus
{
public
interface
IBackgroundWorker
{
bool
IsRunning
{
get
;
}
/// <summary>
/// Start the background worker digest loop.
/// </summary>
void
Start
();
/// <summary>
/// Stop the background worker digest loop.
/// </summary>
/// <param name="timeout"></param>
void
Stop
(
int
timeout
=
2000
);
}
}
\ No newline at end of file
src/Cap.Consistency/EventBus/IEventBus.cs
0 → 100644
View file @
70a4c54e
using
System
;
namespace
Cap.Consistency.EventBus
{
/// <summary>
/// The event bus interface.
/// </summary>
public
interface
IEventBus
:
IBackgroundWorker
{
/// <summary>
/// Post an event to the event bus, dispatched after the specific time.
/// </summary>
/// <param name="eventObject">The event object</param>
/// <param name="dispatchDelay">The delay time before dispatch this event</param>
void
Post
(
object
eventObject
,
TimeSpan
dispatchDelay
);
/// <summary>
/// Register event handlers in the handler instance.
///
/// One handler instance may have many event handler methods.
/// These methods have EventSubscriberAttribute contract and exactly one parameter.
/// </summary>
/// <param name="handler">The instance of event handler class</param>
void
Register
(
object
handler
);
/// <summary>
/// Unregister event handlers belong to the handler instance.
///
/// One handler instance may have many event handler methods.
/// These methods have EventSubscriberAttribute contract and exactly one parameter.
/// </summary>
/// <param name="handler">The instance of event handler class</param>
void
Unregister
(
object
handler
);
}
}
\ No newline at end of file
src/Cap.Consistency/EventBus/IEventBusFactory.cs
0 → 100644
View file @
70a4c54e
namespace
Cap.Consistency.EventBus
{
public
interface
IEventBusFactory
{
IEventBus
CreateEventBus
<
TEventBus
>()
where
TEventBus
:
IEventBus
;
IEventBus
CreateEventBus
<
TEventBus
>(
long
maxPendingEventNumber
)
where
TEventBus
:
IEventBus
;
}
}
\ No newline at end of file
src/Cap.Consistency/EventBus/Ordered/OrderedEventBus.cs
0 → 100644
View file @
70a4c54e
using
System
;
using
System.Collections.Concurrent
;
using
System.Collections.Generic
;
using
System.Reflection
;
using
System.Threading.Tasks
;
using
Microsoft.Extensions.Logging
;
using
Microsoft.Extensions.Options
;
namespace
Cap.Consistency.EventBus
{
/// <summary>
/// The OrderedEventBus class is a simple and fast IEventBus implementation which processes event in the delivery order.
/// </summary>
/// <remarks>If you do not need the event processed in the delivery order, use SimpleEventBus instead.</remarks>
public
class
OrderedEventBus
:
EventBusBase
,
IEventBus
{
private
readonly
BlockingCollection
<
object
>
_eventQueue
;
/// <summary>
/// The pending event number which does not yet dispatched.
/// </summary>
public
override
long
PendingEventNumber
{
get
{
return
Math
.
Max
(
_eventQueue
.
Count
,
0
);
}
}
public
override
bool
IsDispatcherEnabled
{
get
{
return
true
;
}
}
public
OrderedEventBus
(
ILoggerFactory
loggerFactory
,
IOptions
<
EventBusOptions
>
options
)
:
this
(
loggerFactory
,
options
?.
Value
.
MaxPendingEventNumber32
??
0
)
{
}
/// <summary>
/// The constructor of OrderedEventBus.
/// </summary>
/// <param name="maxPendingEventNumber">The maximum pending event number which does not yet dispatched</param>
public
OrderedEventBus
(
ILoggerFactory
loggerFactory
,
int
maxPendingEventNumber
,
bool
shouldStart
=
true
)
:
base
(
loggerFactory
)
{
this
.
_eventQueue
=
new
BlockingCollection
<
object
>(
maxPendingEventNumber
>
0
?
maxPendingEventNumber
:
DefaultMaxPendingEventNumber
);
if
(
shouldStart
)
{
this
.
Start
();
}
}
/// <summary>
/// Post an event to the event bus, dispatched after the specific time.
/// </summary>
/// <remarks>If you do not need the event processed in the delivery order, use SimpleEventBus instead.</remarks>
/// <param name="eventObject">The event object</param>
/// <param name="dispatchDelay">The delay time before dispatch this event</param>
public
override
void
Post
(
object
eventObject
,
TimeSpan
dispatchDelay
)
{
int
dispatchDelayMs
=
(
int
)
dispatchDelay
.
TotalMilliseconds
;
if
(
dispatchDelayMs
>=
1
)
{
Task
.
Delay
(
dispatchDelayMs
).
ContinueWith
(
task
=>
_eventQueue
.
Add
(
eventObject
));
}
else
{
_eventQueue
.
Add
(
eventObject
);
}
}
protected
override
async
Task
<
bool
>
Process
()
{
object
eventObject
=
null
;
try
{
eventObject
=
_eventQueue
.
Take
();
InvokeEventHandler
(
eventObject
);
}
catch
(
Exception
de
)
{
if
(
de
is
ObjectDisposedException
)
{
return
await
Task
.
FromResult
(
true
);
}
this
.
_logger
.
LogError
(
"Dispatch event ({0}) failed: {1}{2}{3}"
,
eventObject
,
de
.
Message
,
Environment
.
NewLine
,
de
.
StackTrace
);
}
return
await
Task
.
FromResult
(
true
);
}
protected
void
InvokeEventHandler
(
object
eventObject
,
Action
<
bool
,
Exception
,
object
,
Type
>
resultCallback
=
null
)
{
List
<
Task
>
taskList
=
null
;
// ReSharper disable once ForCanBeConvertedToForeach
for
(
int
i
=
0
;
i
<
_eventHandlerList
.
Count
;
i
++)
{
// ReSharper disable once InconsistentlySynchronizedField
EventHandlerHolder
record
=
_eventHandlerList
[
i
];
if
(
eventObject
==
null
||
record
.
ParameterType
.
IsInstanceOfType
(
eventObject
))
{
Task
task
=
Task
.
Run
(()
=>
{
this
.
OnMessageReceieved
(
record
);
var
isVoid
=
record
.
MethodInfo
.
ReturnType
==
typeof
(
void
);
try
{
var
result
=
record
.
MethodInfo
.
Invoke
(
record
.
Handler
,
new
[]
{
eventObject
});
resultCallback
?.
Invoke
(
isVoid
,
null
,
result
,
record
.
MethodInfo
.
ReturnType
);
}
catch
(
Exception
ex
)
{
this
.
_logger
.
LogError
(
ex
.
Message
+
Environment
.
NewLine
+
ex
.
StackTrace
);
resultCallback
?.
Invoke
(
isVoid
,
ex
,
null
,
record
.
MethodInfo
.
ReturnType
);
}
});
if
(
taskList
==
null
)
taskList
=
new
List
<
Task
>();
taskList
.
Add
(
task
);
//record.MethodInfo.Invoke(record.Handler, new[] { eventObject });
}
}
if
(
taskList
!=
null
)
{
Task
.
WaitAll
(
taskList
.
ToArray
());
}
else
{
resultCallback
?.
Invoke
(
false
,
null
,
null
,
null
);
}
}
}
}
\ No newline at end of file
src/Cap.Consistency/EventBus/ReceiveResult.cs
0 → 100644
View file @
70a4c54e
using
System
;
namespace
Cap.Consistency.EventBus
{
public
class
ReceiveResult
{
public
bool
IsSucceeded
{
get
;
set
;
}
public
bool
IsVoid
{
get
;
set
;
}
public
object
Result
{
get
;
set
;
}
public
Type
ResultType
{
get
;
set
;
}
public
Exception
Exception
{
get
;
set
;
}
public
ReceiveResult
(
bool
isSucceeded
,
bool
isVoid
,
object
result
,
Exception
ex
=
null
,
Type
resultType
=
null
)
{
this
.
IsSucceeded
=
isSucceeded
;
this
.
IsVoid
=
isVoid
;
this
.
Result
=
result
;
this
.
Exception
=
ex
;
this
.
ResultType
=
(
resultType
??
result
?.
GetType
())
??
typeof
(
object
);
}
}
}
\ No newline at end of file
src/Cap.Consistency/EventBus/Simple/SimpleEventBus.cs
0 → 100644
View file @
70a4c54e
//#define UseTotalEventNumber
using
System
;
using
System.Reflection
;
using
System.Threading
;
using
System.Threading.Tasks
;
using
Microsoft.Extensions.Logging
;
using
Microsoft.Extensions.Options
;
namespace
Cap.Consistency.EventBus
{
/// <summary>
/// The SimpleEventBus class is a simple and fast IEventBus implementation.
/// </summary>
/// <remarks>
/// <para>The event may be processed out of the delivery order under heavy load.</para>
/// <para>If you need the event processed in the delivery order, use OrderedEventBus instead.</para>
/// </remarks>
public
class
SimpleEventBus
:
EventBusBase
,
IEventBus
{
private
readonly
long
_maxPendingEventNumber
;
// Interlocked operation cause the performance drop at least 10% !.
private
long
_pendingEventNumber
;
private
bool
_isDispatcherEnabled
;
#if UseTotalEventNumber
// This counter cause the performance drop at least 5% !
private
long
_totalEventNumber
;
// The total event number which post to the event bus.
// This counter cause the performance drop at least 5% !
public
long
TotalEventNumber
{
get
{
return
Interlocked
.
Read
(
ref
_totalEventNumber
);
}
}
#endif
/// <summary>
/// The pending event number which does not yet dispatched.
/// </summary>
public
override
long
PendingEventNumber
{
get
{
return
Math
.
Max
(
Interlocked
.
Read
(
ref
_pendingEventNumber
),
0
);
}
}
public
override
bool
IsDispatcherEnabled
{
get
{
return
this
.
_isDispatcherEnabled
;
}
}
public
SimpleEventBus
(
ILoggerFactory
loggerFactory
,
IOptions
<
EventBusOptions
>
options
)
:
this
(
loggerFactory
,
options
?.
Value
.
MaxPendingEventNumber
??
0
)
{
}
/// <summary>
/// The constructor of SimpleEventBus.
/// </summary>
/// <param name="maxPendingEventNumber">The maximum pending event number which does not yet dispatched</param>
public
SimpleEventBus
(
ILoggerFactory
loggerFactory
,
long
maxPendingEventNumber
,
bool
shouldStart
=
true
)
:
base
(
loggerFactory
)
{
this
.
_maxPendingEventNumber
=
maxPendingEventNumber
>
0
?
maxPendingEventNumber
:
DefaultMaxPendingEventNumber
;
this
.
_isDispatcherEnabled
=
false
;
if
(
shouldStart
)
{
this
.
Start
();
}
}
public
override
void
Start
()
{
if
(
this
.
IsRunning
)
{
return
;
}
this
.
_isDispatcherEnabled
=
true
;
}
public
override
void
Stop
(
int
timeout
=
2000
)
{
this
.
_isDispatcherEnabled
=
false
;
}
/// <summary>
/// Post an event to the event bus, dispatched after the specific time.
/// </summary>
/// <remarks>
/// <para>The event may be processed out of the delivery order under heavy load.</para>
/// <para>If you need the event processed in the delivery order, use OrderedEventBus instead.</para>
/// </remarks>
/// <param name="eventObject">The event object</param>
/// <param name="dispatchDelay">The delay time before dispatch this event</param>
public
override
void
Post
(
object
eventObject
,
TimeSpan
dispatchDelay
)
{
if
(!
this
.
_isDispatcherEnabled
)
return
;
int
dispatchDelayMs
=
(
int
)
dispatchDelay
.
TotalMilliseconds
;
while
(
Interlocked
.
Read
(
ref
_pendingEventNumber
)
>=
_maxPendingEventNumber
)
{
this
.
_logger
.
LogWarning
(
"Too many events in the EventBus, pendingEventNumber={0}, maxPendingEventNumber={1}{2}PendingEvent='{3}', dispatchDelay={4}ms"
,
PendingEventNumber
,
_maxPendingEventNumber
,
Environment
.
NewLine
,
eventObject
,
dispatchDelayMs
);
Task
.
Delay
(
16
).
Wait
();
}
if
(
dispatchDelayMs
>=
1
)
{
Task
.
Delay
(
dispatchDelayMs
).
ContinueWith
(
task
=>
{
DispatchMessage
(
eventObject
);
});
}
else
{
Task
.
Run
(()
=>
DispatchMessage
(
eventObject
));
}
Interlocked
.
Increment
(
ref
_pendingEventNumber
);
// Interlocked.Increment(ref _totalEventNumber);
}
protected
override
Task
ThreadWorker
(
object
userObject
)
{
throw
new
NotSupportedException
();
}
protected
override
Task
<
bool
>
Process
()
{
throw
new
NotSupportedException
();
}
protected
void
DispatchMessage
(
object
eventObject
)
{
try
{
// ReSharper disable once ForCanBeConvertedToForeach
for
(
int
i
=
0
;
i
<
_eventHandlerList
.
Count
;
i
++)
{
// ReSharper disable once InconsistentlySynchronizedField
EventHandlerHolder
record
=
_eventHandlerList
[
i
];
if
(
eventObject
==
null
||
record
.
ParameterType
.
IsInstanceOfType
(
eventObject
))
{
Task
.
Run
(()
=>
{
try
{
this
.
OnMessageReceieved
(
record
);
record
.
MethodInfo
.
Invoke
(
record
.
Handler
,
new
[]
{
eventObject
});
}
catch
(
Exception
ie
)
{
this
.
_logger
.
LogWarning
(
"Event handler (class '{0}@{1}', method '{2}') failed: {3}{4}{5}{4}eventObject: {6}"
,
record
.
Handler
.
GetType
(),
record
.
Handler
.
GetHashCode
(),
record
.
MethodInfo
,
ie
.
Message
,
Environment
.
NewLine
,
ie
.
StackTrace
,
eventObject
);
}
});
}
}
}
catch
(
Exception
de
)
{
this
.
_logger
.
LogError
(
"Dispatch event ({0}) failed: {1}{2}{3}"
,
eventObject
,
de
.
Message
,
Environment
.
NewLine
,
de
.
StackTrace
);
}
finally
{
Interlocked
.
Decrement
(
ref
_pendingEventNumber
);
}
}
}
}
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment