Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
CAP
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
tsai
CAP
Commits
b67e8ebc
Commit
b67e8ebc
authored
Jul 23, 2018
by
Savorboard
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add copyright license header to code file.
parent
b886cb3e
Changes
10
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
152 additions
and
72 deletions
+152
-72
CAP.MongoDBCapOptionsExtension.cs
src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs
+3
-1
CAP.MongoDBOptions.cs
src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs
+2
-1
CAP.Options.Extensions.cs
src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs
+3
-0
CapPublisher.cs
src/DotNetCore.CAP.MongoDB/CapPublisher.cs
+28
-16
MongoDBCollectProcessor.cs
src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs
+11
-5
MongoDBMonitoringApi.cs
src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs
+60
-24
MongoDBStorage.cs
src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs
+9
-5
MongoDBStorageConnection.cs
src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs
+18
-13
MongoDBStorageTransaction.cs
src/DotNetCore.CAP.MongoDB/MongoDBStorageTransaction.cs
+5
-2
MongoDBUtil.cs
src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs
+13
-5
No files found.
src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs
View file @
b67e8ebc
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using
System
;
using
System
;
using
DotNetCore.CAP
;
using
DotNetCore.CAP.Processor
;
using
DotNetCore.CAP.Processor
;
using
Microsoft.Extensions.DependencyInjection
;
using
Microsoft.Extensions.DependencyInjection
;
...
...
src/DotNetCore.CAP.MongoDB/CAP.MongoDBOptions.cs
View file @
b67e8ebc
using
System
;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
namespace
DotNetCore.CAP.MongoDB
namespace
DotNetCore.CAP.MongoDB
{
{
...
...
src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs
View file @
b67e8ebc
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using
System
;
using
System
;
using
DotNetCore.CAP
;
using
DotNetCore.CAP
;
using
DotNetCore.CAP.MongoDB
;
using
DotNetCore.CAP.MongoDB
;
...
...
src/DotNetCore.CAP.MongoDB/CapPublisher.cs
View file @
b67e8ebc
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using
System
;
using
System
;
using
System.Data
;
using
System.Data
;
using
System.Threading.Tasks
;
using
System.Threading.Tasks
;
...
@@ -12,8 +15,8 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -12,8 +15,8 @@ namespace DotNetCore.CAP.MongoDB
{
{
public
class
CapPublisher
:
CapPublisherBase
,
ICallbackPublisher
public
class
CapPublisher
:
CapPublisherBase
,
ICallbackPublisher
{
{
private
readonly
MongoDBOptions
_options
;
private
readonly
IMongoDatabase
_database
;
private
readonly
IMongoDatabase
_database
;
private
readonly
MongoDBOptions
_options
;
private
bool
_isInTransaction
=
true
;
private
bool
_isInTransaction
=
true
;
public
CapPublisher
(
public
CapPublisher
(
...
@@ -37,22 +40,25 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -37,22 +40,25 @@ namespace DotNetCore.CAP.MongoDB
Enqueue
(
message
);
Enqueue
(
message
);
}
}
protected
override
int
Execute
(
IDbConnection
dbConnection
,
IDbTransaction
dbTransaction
,
CapPublishedMessage
message
)
protected
override
int
Execute
(
IDbConnection
dbConnection
,
IDbTransaction
dbTransaction
,
CapPublishedMessage
message
)
{
{
throw
new
System
.
NotImplementedException
(
"Not work for MongoDB"
);
throw
new
NotImplementedException
(
"Not work for MongoDB"
);
}
}
protected
override
Task
<
int
>
ExecuteAsync
(
IDbConnection
dbConnection
,
IDbTransaction
dbTransaction
,
CapPublishedMessage
message
)
protected
override
Task
<
int
>
ExecuteAsync
(
IDbConnection
dbConnection
,
IDbTransaction
dbTransaction
,
CapPublishedMessage
message
)
{
{
throw
new
System
.
NotImplementedException
(
"Not work for MongoDB"
);
throw
new
NotImplementedException
(
"Not work for MongoDB"
);
}
}
protected
override
void
PrepareConnectionForEF
()
protected
override
void
PrepareConnectionForEF
()
{
{
throw
new
System
.
NotImplementedException
(
"Not work for MongoDB"
);
throw
new
NotImplementedException
(
"Not work for MongoDB"
);
}
}
public
override
void
PublishWithMongo
<
T
>(
string
name
,
T
contentObj
,
object
mongoSession
=
null
,
string
callbackName
=
null
)
public
override
void
PublishWithMongo
<
T
>(
string
name
,
T
contentObj
,
object
mongoSession
=
null
,
string
callbackName
=
null
)
{
{
var
session
=
mongoSession
as
IClientSessionHandle
;
var
session
=
mongoSession
as
IClientSessionHandle
;
if
(
session
==
null
)
if
(
session
==
null
)
...
@@ -60,10 +66,11 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -60,10 +66,11 @@ namespace DotNetCore.CAP.MongoDB
_isInTransaction
=
false
;
_isInTransaction
=
false
;
}
}
PublishWithSession
<
T
>
(
name
,
contentObj
,
session
,
callbackName
);
PublishWithSession
(
name
,
contentObj
,
session
,
callbackName
);
}
}
public
override
async
Task
PublishWithMongoAsync
<
T
>(
string
name
,
T
contentObj
,
object
mongoSession
=
null
,
string
callbackName
=
null
)
public
override
async
Task
PublishWithMongoAsync
<
T
>(
string
name
,
T
contentObj
,
object
mongoSession
=
null
,
string
callbackName
=
null
)
{
{
var
session
=
mongoSession
as
IClientSessionHandle
;
var
session
=
mongoSession
as
IClientSessionHandle
;
if
(
session
==
null
)
if
(
session
==
null
)
...
@@ -71,12 +78,12 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -71,12 +78,12 @@ namespace DotNetCore.CAP.MongoDB
_isInTransaction
=
false
;
_isInTransaction
=
false
;
}
}
await
PublishWithSessionAsync
<
T
>
(
name
,
contentObj
,
session
,
callbackName
);
await
PublishWithSessionAsync
(
name
,
contentObj
,
session
,
callbackName
);
}
}
private
void
PublishWithSession
<
T
>(
string
name
,
T
contentObj
,
IClientSessionHandle
session
,
string
callbackName
)
private
void
PublishWithSession
<
T
>(
string
name
,
T
contentObj
,
IClientSessionHandle
session
,
string
callbackName
)
{
{
Guid
operationId
=
default
(
Guid
);
var
operationId
=
default
(
Guid
);
var
content
=
Serialize
(
contentObj
,
callbackName
);
var
content
=
Serialize
(
contentObj
,
callbackName
);
...
@@ -100,7 +107,7 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -100,7 +107,7 @@ namespace DotNetCore.CAP.MongoDB
Enqueue
(
message
);
Enqueue
(
message
);
}
}
}
}
catch
(
System
.
Exception
e
)
catch
(
Exception
e
)
{
{
_logger
.
LogError
(
e
,
"An exception was occurred when publish message. message:"
+
name
);
_logger
.
LogError
(
e
,
"An exception was occurred when publish message. message:"
+
name
);
s_diagnosticListener
.
WritePublishMessageStoreError
(
operationId
,
message
,
e
);
s_diagnosticListener
.
WritePublishMessageStoreError
(
operationId
,
message
,
e
);
...
@@ -121,12 +128,14 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -121,12 +128,14 @@ namespace DotNetCore.CAP.MongoDB
{
{
collection
.
InsertOne
(
message
);
collection
.
InsertOne
(
message
);
}
}
return
message
.
Id
;
return
message
.
Id
;
}
}
private
async
Task
PublishWithSessionAsync
<
T
>(
string
name
,
T
contentObj
,
IClientSessionHandle
session
,
string
callbackName
)
private
async
Task
PublishWithSessionAsync
<
T
>(
string
name
,
T
contentObj
,
IClientSessionHandle
session
,
string
callbackName
)
{
{
Guid
operationId
=
default
(
Guid
);
var
operationId
=
default
(
Guid
);
var
content
=
Serialize
(
contentObj
,
callbackName
);
var
content
=
Serialize
(
contentObj
,
callbackName
);
var
message
=
new
CapPublishedMessage
var
message
=
new
CapPublishedMessage
...
@@ -152,7 +161,7 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -152,7 +161,7 @@ namespace DotNetCore.CAP.MongoDB
Enqueue
(
message
);
Enqueue
(
message
);
}
}
}
}
catch
(
System
.
Exception
e
)
catch
(
Exception
e
)
{
{
_logger
.
LogError
(
e
,
"An exception was occurred when publish message async. exception message:"
+
name
);
_logger
.
LogError
(
e
,
"An exception was occurred when publish message async. exception message:"
+
name
);
s_diagnosticListener
.
WritePublishMessageStoreError
(
operationId
,
message
,
e
);
s_diagnosticListener
.
WritePublishMessageStoreError
(
operationId
,
message
,
e
);
...
@@ -160,9 +169,11 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -160,9 +169,11 @@ namespace DotNetCore.CAP.MongoDB
throw
;
throw
;
}
}
}
}
private
async
Task
<
int
>
ExecuteAsync
(
IClientSessionHandle
session
,
CapPublishedMessage
message
)
private
async
Task
<
int
>
ExecuteAsync
(
IClientSessionHandle
session
,
CapPublishedMessage
message
)
{
{
message
.
Id
=
await
new
MongoDBUtil
().
GetNextSequenceValueAsync
(
_database
,
_options
.
PublishedCollection
,
session
);
message
.
Id
=
await
new
MongoDBUtil
().
GetNextSequenceValueAsync
(
_database
,
_options
.
PublishedCollection
,
session
);
var
collection
=
_database
.
GetCollection
<
CapPublishedMessage
>(
_options
.
PublishedCollection
);
var
collection
=
_database
.
GetCollection
<
CapPublishedMessage
>(
_options
.
PublishedCollection
);
if
(
_isInTransaction
)
if
(
_isInTransaction
)
{
{
...
@@ -172,6 +183,7 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -172,6 +183,7 @@ namespace DotNetCore.CAP.MongoDB
{
{
await
collection
.
InsertOneAsync
(
message
);
await
collection
.
InsertOneAsync
(
message
);
}
}
return
message
.
Id
;
return
message
.
Id
;
}
}
}
}
...
...
src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs
View file @
b67e8ebc
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using
System
;
using
System
;
using
System.Threading.Tasks
;
using
System.Threading.Tasks
;
using
DotNetCore.CAP.Models
;
using
DotNetCore.CAP.Models
;
...
@@ -9,9 +12,9 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -9,9 +12,9 @@ namespace DotNetCore.CAP.MongoDB
{
{
public
class
MongoDBCollectProcessor
:
ICollectProcessor
public
class
MongoDBCollectProcessor
:
ICollectProcessor
{
{
private
readonly
MongoDBOptions
_options
;
private
readonly
ILogger
_logger
;
private
readonly
IMongoDatabase
_database
;
private
readonly
IMongoDatabase
_database
;
private
readonly
ILogger
_logger
;
private
readonly
MongoDBOptions
_options
;
private
readonly
TimeSpan
_waitingInterval
=
TimeSpan
.
FromMinutes
(
5
);
private
readonly
TimeSpan
_waitingInterval
=
TimeSpan
.
FromMinutes
(
5
);
public
MongoDBCollectProcessor
(
ILogger
<
MongoDBCollectProcessor
>
logger
,
public
MongoDBCollectProcessor
(
ILogger
<
MongoDBCollectProcessor
>
logger
,
...
@@ -25,18 +28,21 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -25,18 +28,21 @@ namespace DotNetCore.CAP.MongoDB
public
async
Task
ProcessAsync
(
ProcessingContext
context
)
public
async
Task
ProcessAsync
(
ProcessingContext
context
)
{
{
_logger
.
LogDebug
(
$"Collecting expired data from collection [
{
_options
.
Database
}
].[
{
_options
.
PublishedCollection
}
]."
);
_logger
.
LogDebug
(
$"Collecting expired data from collection [
{
_options
.
Database
}
].[
{
_options
.
PublishedCollection
}
]."
);
var
publishedCollection
=
_database
.
GetCollection
<
CapPublishedMessage
>(
_options
.
PublishedCollection
);
var
publishedCollection
=
_database
.
GetCollection
<
CapPublishedMessage
>(
_options
.
PublishedCollection
);
var
receivedCollection
=
_database
.
GetCollection
<
CapReceivedMessage
>(
_options
.
ReceivedCollection
);
var
receivedCollection
=
_database
.
GetCollection
<
CapReceivedMessage
>(
_options
.
ReceivedCollection
);
await
publishedCollection
.
BulkWriteAsync
(
new
[]
await
publishedCollection
.
BulkWriteAsync
(
new
[]
{
{
new
DeleteManyModel
<
CapPublishedMessage
>(
Builders
<
CapPublishedMessage
>.
Filter
.
Lt
(
x
=>
x
.
ExpiresAt
,
DateTime
.
Now
))
new
DeleteManyModel
<
CapPublishedMessage
>(
Builders
<
CapPublishedMessage
>.
Filter
.
Lt
(
x
=>
x
.
ExpiresAt
,
DateTime
.
Now
))
});
});
await
receivedCollection
.
BulkWriteAsync
(
new
[]
await
receivedCollection
.
BulkWriteAsync
(
new
[]
{
{
new
DeleteManyModel
<
CapReceivedMessage
>(
Builders
<
CapReceivedMessage
>.
Filter
.
Lt
(
x
=>
x
.
ExpiresAt
,
DateTime
.
Now
))
new
DeleteManyModel
<
CapReceivedMessage
>(
Builders
<
CapReceivedMessage
>.
Filter
.
Lt
(
x
=>
x
.
ExpiresAt
,
DateTime
.
Now
))
});
});
await
context
.
WaitAsync
(
_waitingInterval
);
await
context
.
WaitAsync
(
_waitingInterval
);
...
...
src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs
View file @
b67e8ebc
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using
System
;
using
System
;
using
System.Collections.Generic
;
using
System.Collections.Generic
;
using
System.Linq
;
using
DotNetCore.CAP.Dashboard
;
using
DotNetCore.CAP.Dashboard
;
using
DotNetCore.CAP.Dashboard.Monitoring
;
using
DotNetCore.CAP.Dashboard.Monitoring
;
using
DotNetCore.CAP.Infrastructure
;
using
DotNetCore.CAP.Infrastructure
;
...
@@ -12,8 +14,8 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -12,8 +14,8 @@ namespace DotNetCore.CAP.MongoDB
{
{
public
class
MongoDBMonitoringApi
:
IMonitoringApi
public
class
MongoDBMonitoringApi
:
IMonitoringApi
{
{
private
readonly
MongoDBOptions
_options
;
private
readonly
IMongoDatabase
_database
;
private
readonly
IMongoDatabase
_database
;
private
readonly
MongoDBOptions
_options
;
public
MongoDBMonitoringApi
(
IMongoClient
client
,
MongoDBOptions
options
)
public
MongoDBMonitoringApi
(
IMongoClient
client
,
MongoDBOptions
options
)
{
{
...
@@ -31,20 +33,34 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -31,20 +33,34 @@ namespace DotNetCore.CAP.MongoDB
var
statistics
=
new
StatisticsDto
();
var
statistics
=
new
StatisticsDto
();
{
{
if
(
int
.
TryParse
(
publishedCollection
.
CountDocuments
(
x
=>
x
.
StatusName
==
StatusName
.
Succeeded
).
ToString
(),
out
var
count
))
if
(
int
.
TryParse
(
publishedCollection
.
CountDocuments
(
x
=>
x
.
StatusName
==
StatusName
.
Succeeded
).
ToString
(),
out
var
count
))
{
statistics
.
PublishedSucceeded
=
count
;
statistics
.
PublishedSucceeded
=
count
;
}
}
}
{
{
if
(
int
.
TryParse
(
publishedCollection
.
CountDocuments
(
x
=>
x
.
StatusName
==
StatusName
.
Failed
).
ToString
(),
out
var
count
))
if
(
int
.
TryParse
(
publishedCollection
.
CountDocuments
(
x
=>
x
.
StatusName
==
StatusName
.
Failed
).
ToString
(),
out
var
count
))
{
statistics
.
PublishedFailed
=
count
;
statistics
.
PublishedFailed
=
count
;
}
}
}
{
{
if
(
int
.
TryParse
(
receivedCollection
.
CountDocuments
(
x
=>
x
.
StatusName
==
StatusName
.
Succeeded
).
ToString
(),
out
var
count
))
if
(
int
.
TryParse
(
receivedCollection
.
CountDocuments
(
x
=>
x
.
StatusName
==
StatusName
.
Succeeded
).
ToString
(),
out
var
count
))
{
statistics
.
ReceivedSucceeded
=
count
;
statistics
.
ReceivedSucceeded
=
count
;
}
}
}
{
{
if
(
int
.
TryParse
(
receivedCollection
.
CountDocuments
(
x
=>
x
.
StatusName
==
StatusName
.
Failed
).
ToString
(),
out
var
count
))
if
(
int
.
TryParse
(
receivedCollection
.
CountDocuments
(
x
=>
x
.
StatusName
==
StatusName
.
Failed
).
ToString
(),
out
var
count
))
{
statistics
.
ReceivedFailed
=
count
;
statistics
.
ReceivedFailed
=
count
;
}
}
}
return
statistics
;
return
statistics
;
...
@@ -64,7 +80,9 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -64,7 +80,9 @@ namespace DotNetCore.CAP.MongoDB
{
{
queryDto
.
StatusName
=
StatusName
.
Standardized
(
queryDto
.
StatusName
);
queryDto
.
StatusName
=
StatusName
.
Standardized
(
queryDto
.
StatusName
);
var
name
=
queryDto
.
MessageType
==
MessageType
.
Publish
?
_options
.
PublishedCollection
:
_options
.
ReceivedCollection
;
var
name
=
queryDto
.
MessageType
==
MessageType
.
Publish
?
_options
.
PublishedCollection
:
_options
.
ReceivedCollection
;
var
collection
=
_database
.
GetCollection
<
MessageDto
>(
name
);
var
collection
=
_database
.
GetCollection
<
MessageDto
>(
name
);
var
builder
=
Builders
<
MessageDto
>.
Filter
;
var
builder
=
Builders
<
MessageDto
>.
Filter
;
...
@@ -73,14 +91,17 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -73,14 +91,17 @@ namespace DotNetCore.CAP.MongoDB
{
{
filter
=
filter
&
builder
.
Eq
(
x
=>
x
.
StatusName
,
queryDto
.
StatusName
);
filter
=
filter
&
builder
.
Eq
(
x
=>
x
.
StatusName
,
queryDto
.
StatusName
);
}
}
if
(!
string
.
IsNullOrEmpty
(
queryDto
.
Name
))
if
(!
string
.
IsNullOrEmpty
(
queryDto
.
Name
))
{
{
filter
=
filter
&
builder
.
Eq
(
x
=>
x
.
Name
,
queryDto
.
Name
);
filter
=
filter
&
builder
.
Eq
(
x
=>
x
.
Name
,
queryDto
.
Name
);
}
}
if
(!
string
.
IsNullOrEmpty
(
queryDto
.
Group
))
if
(!
string
.
IsNullOrEmpty
(
queryDto
.
Group
))
{
{
filter
=
filter
&
builder
.
Eq
(
x
=>
x
.
Group
,
queryDto
.
Group
);
filter
=
filter
&
builder
.
Eq
(
x
=>
x
.
Group
,
queryDto
.
Group
);
}
}
if
(!
string
.
IsNullOrEmpty
(
queryDto
.
Content
))
if
(!
string
.
IsNullOrEmpty
(
queryDto
.
Content
))
{
{
filter
=
filter
&
builder
.
Regex
(
x
=>
x
.
Content
,
".*"
+
queryDto
.
Content
+
".*"
);
filter
=
filter
&
builder
.
Regex
(
x
=>
x
.
Content
,
".*"
+
queryDto
.
Content
+
".*"
);
...
@@ -119,52 +140,66 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -119,52 +140,66 @@ namespace DotNetCore.CAP.MongoDB
private
int
GetNumberOfMessage
(
string
collectionName
,
string
statusName
)
private
int
GetNumberOfMessage
(
string
collectionName
,
string
statusName
)
{
{
var
collection
=
_database
.
GetCollection
<
BsonDocument
>(
collectionName
);
var
collection
=
_database
.
GetCollection
<
BsonDocument
>(
collectionName
);
var
count
=
collection
.
CountDocuments
(
new
BsonDocument
{
{
"StatusName"
,
statusName
}
});
var
count
=
collection
.
CountDocuments
(
new
BsonDocument
{
{
"StatusName"
,
statusName
}
});
return
int
.
Parse
(
count
.
ToString
());
return
int
.
Parse
(
count
.
ToString
());
}
}
private
IDictionary
<
DateTime
,
int
>
GetHourlyTimelineStats
(
MessageType
type
,
string
statusName
)
private
IDictionary
<
DateTime
,
int
>
GetHourlyTimelineStats
(
MessageType
type
,
string
statusName
)
{
{
var
collectionName
=
type
==
MessageType
.
Publish
?
_options
.
PublishedCollection
:
_options
.
ReceivedCollection
;
var
collectionName
=
type
==
MessageType
.
Publish
?
_options
.
PublishedCollection
:
_options
.
ReceivedCollection
;
var
endDate
=
DateTime
.
UtcNow
;
var
endDate
=
DateTime
.
UtcNow
;
var
groupby
=
new
BsonDocument
{
var
groupby
=
new
BsonDocument
{
{
{
"$group"
,
new
BsonDocument
{
"$group"
,
new
BsonDocument
{
"_id"
,
new
BsonDocument
{
{
{
"Key"
,
new
BsonDocument
{
{
{
"$dateToString"
,
new
BsonDocument
{
"_id"
,
new
BsonDocument
{
"format"
,
"%Y-%m-%d %H:00:00"
},
{
{
"date"
,
"$Added"
}
{
"Key"
,
new
BsonDocument
{
{
"$dateToString"
,
new
BsonDocument
{
{
"format"
,
"%Y-%m-%d %H:00:00"
},
{
"date"
,
"$Added"
}
}
}
}
}
}
}
}
}
}
}
},
},
{
"Count"
,
new
BsonDocument
{{
"$sum"
,
1
}}}
{
"Count"
,
new
BsonDocument
{{
"$sum"
,
1
}}}
}
}
}
}
};
};
var
match
=
new
BsonDocument
{
var
match
=
new
BsonDocument
{
"$match"
,
new
BsonDocument
{
{
{
"Added"
,
new
BsonDocument
{
"$match"
,
new
BsonDocument
{
{
"Added"
,
new
BsonDocument
{
{
{
"$gt"
,
endDate
.
AddHours
(-
24
)
}
{
"$gt"
,
endDate
.
AddHours
(-
24
)
}
}
}
},
},
{
"StatusName"
,
{
"StatusName"
,
new
BsonDocument
new
BsonDocument
{
{
{
"$eq"
,
statusName
}
{
"$eq"
,
statusName
}
}
}
}
}
}
}
}
}
};
};
var
pipeline
=
new
[]
{
match
,
groupby
};
var
pipeline
=
new
[]
{
match
,
groupby
};
var
collection
=
_database
.
GetCollection
<
BsonDocument
>(
collectionName
);
var
collection
=
_database
.
GetCollection
<
BsonDocument
>(
collectionName
);
var
result
=
collection
.
Aggregate
<
BsonDocument
>(
pipeline
).
ToList
();
var
result
=
collection
.
Aggregate
<
BsonDocument
>(
pipeline
).
ToList
();
...
@@ -175,6 +210,7 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -175,6 +210,7 @@ namespace DotNetCore.CAP.MongoDB
dic
.
Add
(
DateTime
.
Parse
(
endDate
.
ToLocalTime
().
ToString
(
"yyyy-MM-dd HH:00:00"
)),
0
);
dic
.
Add
(
DateTime
.
Parse
(
endDate
.
ToLocalTime
().
ToString
(
"yyyy-MM-dd HH:00:00"
)),
0
);
endDate
=
endDate
.
AddHours
(-
1
);
endDate
=
endDate
.
AddHours
(-
1
);
}
}
result
.
ForEach
(
d
=>
result
.
ForEach
(
d
=>
{
{
var
key
=
d
[
"_id"
].
AsBsonDocument
[
"Key"
].
AsString
;
var
key
=
d
[
"_id"
].
AsBsonDocument
[
"Key"
].
AsString
;
...
...
src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs
View file @
b67e8ebc
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using
System.Linq
;
using
System.Linq
;
using
System.Threading
;
using
System.Threading
;
using
System.Threading.Tasks
;
using
System.Threading.Tasks
;
...
@@ -11,9 +14,9 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -11,9 +14,9 @@ namespace DotNetCore.CAP.MongoDB
public
class
MongoDBStorage
:
IStorage
public
class
MongoDBStorage
:
IStorage
{
{
private
readonly
CapOptions
_capOptions
;
private
readonly
CapOptions
_capOptions
;
private
readonly
MongoDBOptions
_options
;
private
readonly
IMongoClient
_client
;
private
readonly
IMongoClient
_client
;
private
readonly
ILogger
<
MongoDBStorage
>
_logger
;
private
readonly
ILogger
<
MongoDBStorage
>
_logger
;
private
readonly
MongoDBOptions
_options
;
public
MongoDBStorage
(
CapOptions
capOptions
,
public
MongoDBStorage
(
CapOptions
capOptions
,
MongoDBOptions
options
,
MongoDBOptions
options
,
...
@@ -53,17 +56,18 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -53,17 +56,18 @@ namespace DotNetCore.CAP.MongoDB
if
(
names
.
All
(
n
=>
n
!=
_options
.
PublishedCollection
))
if
(
names
.
All
(
n
=>
n
!=
_options
.
PublishedCollection
))
{
{
await
database
.
CreateCollectionAsync
(
_options
.
PublishedCollection
,
cancellationToken
:
cancellationToken
);
await
database
.
CreateCollectionAsync
(
_options
.
PublishedCollection
,
cancellationToken
:
cancellationToken
);
}
}
if
(
names
.
All
(
n
=>
n
!=
"Counter"
))
if
(
names
.
All
(
n
=>
n
!=
"Counter"
))
{
{
await
database
.
CreateCollectionAsync
(
"Counter"
,
cancellationToken
:
cancellationToken
);
await
database
.
CreateCollectionAsync
(
"Counter"
,
cancellationToken
:
cancellationToken
);
var
collection
=
database
.
GetCollection
<
BsonDocument
>(
"Counter"
);
var
collection
=
database
.
GetCollection
<
BsonDocument
>(
"Counter"
);
await
collection
.
InsertManyAsync
(
new
BsonDocument
[]
await
collection
.
InsertManyAsync
(
new
[]
{
{
new
BsonDocument
{{
"_id"
,
_options
.
PublishedCollection
},
{
"sequence_value"
,
0
}},
new
BsonDocument
{{
"_id"
,
_options
.
PublishedCollection
},
{
"sequence_value"
,
0
}},
new
BsonDocument
{{
"_id"
,
_options
.
ReceivedCollection
},
{
"sequence_value"
,
0
}}
new
BsonDocument
{{
"_id"
,
_options
.
ReceivedCollection
},
{
"sequence_value"
,
0
}}
},
cancellationToken
:
cancellationToken
);
},
cancellationToken
:
cancellationToken
);
}
}
...
...
src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs
View file @
b67e8ebc
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using
System
;
using
System
;
using
System.Collections.Generic
;
using
System.Collections.Generic
;
using
System.Threading.Tasks
;
using
System.Threading.Tasks
;
using
DotNetCore.CAP.Infrastructure
;
using
DotNetCore.CAP.Infrastructure
;
using
DotNetCore.CAP.Models
;
using
DotNetCore.CAP.Models
;
using
MongoDB.Bson
;
using
MongoDB.Driver
;
using
MongoDB.Driver
;
namespace
DotNetCore.CAP.MongoDB
namespace
DotNetCore.CAP.MongoDB
...
@@ -11,9 +13,9 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -11,9 +13,9 @@ namespace DotNetCore.CAP.MongoDB
public
class
MongoDBStorageConnection
:
IStorageConnection
public
class
MongoDBStorageConnection
:
IStorageConnection
{
{
private
readonly
CapOptions
_capOptions
;
private
readonly
CapOptions
_capOptions
;
private
readonly
MongoDBOptions
_options
;
private
readonly
IMongoClient
_client
;
private
readonly
IMongoClient
_client
;
private
readonly
IMongoDatabase
_database
;
private
readonly
IMongoDatabase
_database
;
private
readonly
MongoDBOptions
_options
;
public
MongoDBStorageConnection
(
CapOptions
capOptions
,
MongoDBOptions
options
,
IMongoClient
client
)
public
MongoDBStorageConnection
(
CapOptions
capOptions
,
MongoDBOptions
options
,
IMongoClient
client
)
{
{
...
@@ -28,12 +30,12 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -28,12 +30,12 @@ namespace DotNetCore.CAP.MongoDB
var
collection
=
_database
.
GetCollection
<
CapPublishedMessage
>(
_options
.
PublishedCollection
);
var
collection
=
_database
.
GetCollection
<
CapPublishedMessage
>(
_options
.
PublishedCollection
);
var
updateDef
=
Builders
<
CapPublishedMessage
>
var
updateDef
=
Builders
<
CapPublishedMessage
>
.
Update
.
Inc
(
x
=>
x
.
Retries
,
1
)
.
Update
.
Inc
(
x
=>
x
.
Retries
,
1
)
.
Set
(
x
=>
x
.
ExpiresAt
,
null
)
.
Set
(
x
=>
x
.
ExpiresAt
,
null
)
.
Set
(
x
=>
x
.
StatusName
,
state
);
.
Set
(
x
=>
x
.
StatusName
,
state
);
var
result
=
var
result
=
collection
.
UpdateOne
(
x
=>
x
.
Id
==
messageId
,
updateDef
);
collection
.
UpdateOne
(
x
=>
x
.
Id
==
messageId
,
updateDef
);
return
result
.
ModifiedCount
>
0
;
return
result
.
ModifiedCount
>
0
;
}
}
...
@@ -43,12 +45,12 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -43,12 +45,12 @@ namespace DotNetCore.CAP.MongoDB
var
collection
=
_database
.
GetCollection
<
CapReceivedMessage
>(
_options
.
ReceivedCollection
);
var
collection
=
_database
.
GetCollection
<
CapReceivedMessage
>(
_options
.
ReceivedCollection
);
var
updateDef
=
Builders
<
CapReceivedMessage
>
var
updateDef
=
Builders
<
CapReceivedMessage
>
.
Update
.
Inc
(
x
=>
x
.
Retries
,
1
)
.
Update
.
Inc
(
x
=>
x
.
Retries
,
1
)
.
Set
(
x
=>
x
.
ExpiresAt
,
null
)
.
Set
(
x
=>
x
.
ExpiresAt
,
null
)
.
Set
(
x
=>
x
.
StatusName
,
state
);
.
Set
(
x
=>
x
.
StatusName
,
state
);
var
result
=
var
result
=
collection
.
UpdateOne
(
x
=>
x
.
Id
==
messageId
,
updateDef
);
collection
.
UpdateOne
(
x
=>
x
.
Id
==
messageId
,
updateDef
);
return
result
.
ModifiedCount
>
0
;
return
result
.
ModifiedCount
>
0
;
}
}
...
@@ -69,7 +71,8 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -69,7 +71,8 @@ namespace DotNetCore.CAP.MongoDB
var
fourMinsAgo
=
DateTime
.
Now
.
AddMinutes
(-
4
);
var
fourMinsAgo
=
DateTime
.
Now
.
AddMinutes
(-
4
);
var
collection
=
_database
.
GetCollection
<
CapPublishedMessage
>(
_options
.
PublishedCollection
);
var
collection
=
_database
.
GetCollection
<
CapPublishedMessage
>(
_options
.
PublishedCollection
);
return
await
collection
return
await
collection
.
Find
(
x
=>
x
.
Retries
<
_capOptions
.
FailedRetryCount
&&
x
.
Added
<
fourMinsAgo
&&
(
x
.
StatusName
==
StatusName
.
Failed
||
x
.
StatusName
==
StatusName
.
Scheduled
))
.
Find
(
x
=>
x
.
Retries
<
_capOptions
.
FailedRetryCount
&&
x
.
Added
<
fourMinsAgo
&&
(
x
.
StatusName
==
StatusName
.
Failed
||
x
.
StatusName
==
StatusName
.
Scheduled
))
.
Limit
(
200
)
.
Limit
(
200
)
.
ToListAsync
();
.
ToListAsync
();
}
}
...
@@ -86,7 +89,8 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -86,7 +89,8 @@ namespace DotNetCore.CAP.MongoDB
var
collection
=
_database
.
GetCollection
<
CapReceivedMessage
>(
_options
.
ReceivedCollection
);
var
collection
=
_database
.
GetCollection
<
CapReceivedMessage
>(
_options
.
ReceivedCollection
);
return
await
collection
return
await
collection
.
Find
(
x
=>
x
.
Retries
<
_capOptions
.
FailedRetryCount
&&
x
.
Added
<
fourMinsAgo
&&
(
x
.
StatusName
==
StatusName
.
Failed
||
x
.
StatusName
==
StatusName
.
Scheduled
))
.
Find
(
x
=>
x
.
Retries
<
_capOptions
.
FailedRetryCount
&&
x
.
Added
<
fourMinsAgo
&&
(
x
.
StatusName
==
StatusName
.
Failed
||
x
.
StatusName
==
StatusName
.
Scheduled
))
.
Limit
(
200
)
.
Limit
(
200
)
.
ToListAsync
();
.
ToListAsync
();
}
}
...
@@ -97,6 +101,7 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -97,6 +101,7 @@ namespace DotNetCore.CAP.MongoDB
{
{
throw
new
ArgumentNullException
(
nameof
(
message
));
throw
new
ArgumentNullException
(
nameof
(
message
));
}
}
var
collection
=
_database
.
GetCollection
<
CapReceivedMessage
>(
_options
.
ReceivedCollection
);
var
collection
=
_database
.
GetCollection
<
CapReceivedMessage
>(
_options
.
ReceivedCollection
);
message
.
Id
=
await
new
MongoDBUtil
().
GetNextSequenceValueAsync
(
_database
,
_options
.
ReceivedCollection
);
message
.
Id
=
await
new
MongoDBUtil
().
GetNextSequenceValueAsync
(
_database
,
_options
.
ReceivedCollection
);
...
@@ -105,9 +110,9 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -105,9 +110,9 @@ namespace DotNetCore.CAP.MongoDB
return
message
.
Id
;
return
message
.
Id
;
}
}
public
void
Dispose
()
public
void
Dispose
()
{
{
}
}
}
}
}
}
\ No newline at end of file
src/DotNetCore.CAP.MongoDB/MongoDBStorageTransaction.cs
View file @
b67e8ebc
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using
System
;
using
System.Threading.Tasks
;
using
System.Threading.Tasks
;
using
DotNetCore.CAP.Models
;
using
DotNetCore.CAP.Models
;
using
MongoDB.Driver
;
using
MongoDB.Driver
;
using
System
;
namespace
DotNetCore.CAP.MongoDB
namespace
DotNetCore.CAP.MongoDB
{
{
internal
class
MongoDBStorageTransaction
:
IStorageTransaction
internal
class
MongoDBStorageTransaction
:
IStorageTransaction
{
{
private
readonly
MongoDBOptions
_options
;
private
readonly
IMongoDatabase
_database
;
private
readonly
IMongoDatabase
_database
;
private
readonly
MongoDBOptions
_options
;
private
readonly
IClientSessionHandle
_session
;
private
readonly
IClientSessionHandle
_session
;
public
MongoDBStorageTransaction
(
IMongoClient
client
,
MongoDBOptions
options
)
public
MongoDBStorageTransaction
(
IMongoClient
client
,
MongoDBOptions
options
)
...
...
src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs
View file @
b67e8ebc
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using
System
;
using
System
;
using
System.Threading.Tasks
;
using
System.Threading.Tasks
;
using
MongoDB.Bson
;
using
MongoDB.Bson
;
...
@@ -7,17 +10,19 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -7,17 +10,19 @@ namespace DotNetCore.CAP.MongoDB
{
{
internal
class
MongoDBUtil
internal
class
MongoDBUtil
{
{
readonly
FindOneAndUpdateOptions
<
BsonDocument
>
_options
=
new
FindOneAndUpdateOptions
<
BsonDocument
>()
private
readonly
FindOneAndUpdateOptions
<
BsonDocument
>
_options
=
new
FindOneAndUpdateOptions
<
BsonDocument
>
{
{
ReturnDocument
=
ReturnDocument
.
After
ReturnDocument
=
ReturnDocument
.
After
};
};
public
async
Task
<
int
>
GetNextSequenceValueAsync
(
IMongoDatabase
database
,
string
collectionName
,
IClientSessionHandle
session
=
null
)
public
async
Task
<
int
>
GetNextSequenceValueAsync
(
IMongoDatabase
database
,
string
collectionName
,
IClientSessionHandle
session
=
null
)
{
{
//https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm
//https://www.tutorialspoint.com/mongodb/mongodb_autoincrement_sequence.htm
var
collection
=
database
.
GetCollection
<
BsonDocument
>(
"Counter"
);
var
collection
=
database
.
GetCollection
<
BsonDocument
>(
"Counter"
);
var
updateDef
=
Builders
<
BsonDocument
>.
Update
.
Inc
(
"sequence_value"
,
1
);
var
updateDef
=
Builders
<
BsonDocument
>.
Update
.
Inc
(
"sequence_value"
,
1
);
var
filter
=
new
BsonDocument
{
{
"_id"
,
collectionName
}
};
var
filter
=
new
BsonDocument
{
{
"_id"
,
collectionName
}
};
BsonDocument
result
;
BsonDocument
result
;
if
(
session
==
null
)
if
(
session
==
null
)
...
@@ -33,14 +38,16 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -33,14 +38,16 @@ namespace DotNetCore.CAP.MongoDB
{
{
return
value
.
ToInt32
();
return
value
.
ToInt32
();
}
}
throw
new
Exception
(
"Unable to get next sequence value."
);
throw
new
Exception
(
"Unable to get next sequence value."
);
}
}
public
int
GetNextSequenceValue
(
IMongoDatabase
database
,
string
collectionName
,
IClientSessionHandle
session
=
null
)
public
int
GetNextSequenceValue
(
IMongoDatabase
database
,
string
collectionName
,
IClientSessionHandle
session
=
null
)
{
{
var
collection
=
database
.
GetCollection
<
BsonDocument
>(
"Counter"
);
var
collection
=
database
.
GetCollection
<
BsonDocument
>(
"Counter"
);
var
filter
=
new
BsonDocument
{
{
"_id"
,
collectionName
}
};
var
filter
=
new
BsonDocument
{
{
"_id"
,
collectionName
}
};
var
updateDef
=
Builders
<
BsonDocument
>.
Update
.
Inc
(
"sequence_value"
,
1
);
var
updateDef
=
Builders
<
BsonDocument
>.
Update
.
Inc
(
"sequence_value"
,
1
);
var
result
=
session
==
null
var
result
=
session
==
null
...
@@ -51,6 +58,7 @@ namespace DotNetCore.CAP.MongoDB
...
@@ -51,6 +58,7 @@ namespace DotNetCore.CAP.MongoDB
{
{
return
value
.
ToInt32
();
return
value
.
ToInt32
();
}
}
throw
new
Exception
(
"Unable to get next sequence value."
);
throw
new
Exception
(
"Unable to get next sequence value."
);
}
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment